Commit fb3cb89c authored by Your Name's avatar Your Name

init go-kafuka

parent 22888bbf
package main
import (
"sync"
pbUpstream "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
)
type apiQueryTxsForAddr struct {
uuid string
res chan pbUpstream.TaskResponse
}
var ApiQueryTxsByAddrForQueue = make(chan apiQueryTxsForAddr, 1000)
func req(uuid string) chan pbUpstream.TaskResponse {
//ApiQueryTxsByAddrForQueue
res := make(chan pbUpstream.TaskResponse)
ApiQueryTxsByAddrForQueue <- apiQueryTxsForAddr{
uuid: uuid,
res: res,
}
return res
}
func res(res pbUpstream.TaskResponse) {
if v, ok := recordmap.LoadAndDelete(res.TaskId); ok {
resAsV, ok := v.(chan pbUpstream.TaskResponse)
if ok {
resAsV <- res
}
}
}
var recordmap sync.Map
func recordUUID() {
for message := range ApiQueryTxsByAddrForQueue {
recordmap.Store(message.uuid, message.res)
}
}
// res := make(chan types.Transactions)
// t.ApiQueryTxsByAddrForQueue <- apiQueryTxsForAddr{
// addr: (metaTypes.Address)(addr),
// res: res,
// }
// queueTxs := <-res
curl https://api.openai.com/v1/images/generations \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-d '{
"model": "dall-e-3",
"prompt": "A cute baby sea otter",
"n": 1,
"size": "1024x1024"
}'
{
"created": 1589478378,
"data": [
{
"url": "https://..."
},
{
"url": "https://..."
}
]
}
curl https://api.openai.com/v1/images/edits \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-F image="@otter.png" \
-F mask="@mask.png" \
-F prompt="A cute baby sea otter wearing a beret" \
-F n=2 \
-F size="1024x1024"
{
"created": 1589478378,
"data": [
{
"url": "https://..."
},
{
"url": "https://..."
}
]
}
curl https://api.openai.com/v1/images/variations \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-F image="@otter.png" \
-F n=2 \
-F size="1024x1024"
{
"created": 1589478378,
"data": [
{
"url": "https://..."
},
{
"url": "https://..."
}
]
}
curl https://api.openai.com/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-d '{
"model": "gpt-3.5-turbo",
"messages": [
{
"role": "system",
"content": "You are a helpful assistant."
},
{
"role": "user",
"content": "Hello!"
}
]
}'
{
"id": "chatcmpl-123",
"object": "chat.completion",
"created": 1677652288,
"model": "gpt-3.5-turbo-0613",
"system_fingerprint": "fp_44709d6fcb",
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": "\n\nHello there, how may I assist you today?",
},
"logprobs": null,
"finish_reason": "stop"
}],
"usage": {
"prompt_tokens": 9,
"completion_tokens": 12,
"total_tokens": 21
}
}
curl http://192.168.1.220:8000/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-d '{
"model": "gpt-3.5-turbo",
"messages": [
{
"role": "system",
"content": "You are a helpful assistant."
},
{
"role": "user",
"content": "Hello!"
}
]
}'
curl http://192.168.1.220:8000/v1/images/generations \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-d '{
"model": "dall-e-3",
"prompt": "A cute baby sea otter",
"n": 1,
"size": "1024x1024"
}'
{
"created": 1589478378,
"data": [
{
"url": "https://..."
},
{
"url": "https://..."
}
]
}
{
"modelType": "picture",
"model": "aipaint",
"desc": "dogs",
"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ0aW1lU3RhbXAiOjE3MDQwODUzMjA3MjQsInVzZXJSb2xlIjoiY29tbW9uIiwidXNlcklkIjoiNDMifQ.EozgbLG9qIDU6og4t0RWyZ7Sl2m2PSRrqjRd4QVfkBQ"
}
curl http://192.168.1.220:8000/v1/images/generations \
-H "Content-Type: application/json" \
-H "Authorization: C1c9oY1c1ejsLWhFqqVo2eMvww6ZfQ4G" \
-d '{
"modelType": "picture",
"model": "aipaint",
"desc": "dogs",
"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ0aW1lU3RhbXAiOjE3MDQwODUzMjA3MjQsInVzZXJSb2xlIjoiY29tbW9uIiwidXNlcklkIjoiNDMifQ.EozgbLG9qIDU6og4t0RWyZ7Sl2m2PSRrqjRd4QVfkBQ"
}'
curl http://192.168.1.220:8000/v1/images/generations \
-H "Content-Type: application/json" \
-H "Authorization: C1c9oY1c1ejsLWhFqqVo2eMvww6ZfQ4G" \
-d '{
"model": "dall-e-3",
"prompt": "A cute baby sea otter",
"n": 1,
"size": "1024x1024"
}'
curl http://192.168.1.220:8000/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: C1c9oY1c1ejsLWhFqqVo2eMvww6ZfQ4G" \
-d '{
"model": "gpt-3.5-turbo",
"messages": [
{
"role": "system",
"content": "You are a helpful assistant."
},
{
"role": "user",
"content": "Hello!"
}
]
}'
curl http://192.168.1.220:8000/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: C1c9oY1c1ejsLWhFqqVo2eMvww6ZfQ4G" \
-d '{
"modelType": "language",
"model": "chat",
"desc": "hello",
"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ0aW1lU3RhbXAiOjE3MDQwODUzMjA3MjQsInVzZXJSb2xlIjoiY29tbW9uIiwidXNlcklkIjoiNDMifQ.EozgbLG9qIDU6og4t0RWyZ7Sl2m2PSRrqjRd4QVfkBQ"
}'
https://docs.aigic.ai/text-to-image
{
"modelType": "picture",
"model": "aipaint",
"desc": "dogs",
"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ0aW1lU3RhbXAiOjE3MDQwODUzMjA3MjQsInVzZXJSb2xlIjoiY29tbW9uIiwidXNlcklkIjoiNDMifQ.EozgbLG9qIDU6og4t0RWyZ7Sl2m2PSRrqjRd4QVfkBQ"
}
{
"code": 200,
"msg": null,
"content": "https://paint4art.oss-cn-beijing.aliyuncs.com/aiimages/SN90P6aKwh.jpg"
}
{
"modelType": "language",
"model": "chat",
"desc": "hello",
"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ0aW1lU3RhbXAiOjE3MDQwODUzMjA3MjQsInVzZXJSb2xlIjoiY29tbW9uIiwidXNlcklkIjoiNDMifQ.EozgbLG9qIDU6og4t0RWyZ7Sl2m2PSRrqjRd4QVfkBQ"
}
{
"code": 200,
"msg": null,
"content": "Hello! How can I assist you today?"
}
curl http://192.168.1.220:6000/v1/images/generations \
-H "Content-Type: application/json" \
-H "Authorization: C1c9oY1c1ejsLWhFqqVo2eMvww6ZfQ4G" \
-d '{
"modelType": "picture",
"model": "aipaint",
"desc": "dogs",
"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ0aW1lU3RhbXAiOjE3MDQwODUzMjA3MjQsInVzZXJSb2xlIjoiY29tbW9uIiwidXNlcklkIjoiNDMifQ.EozgbLG9qIDU6og4t0RWyZ7Sl2m2PSRrqjRd4QVfkBQ"
}'
......@@ -7,6 +7,11 @@ require (
github.com/gofiber/fiber/v2 v2.48.0
)
require (
github.com/gogo/protobuf v1.3.2 // indirect
google.golang.org/protobuf v1.32.0 // indirect
)
require (
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
......@@ -27,13 +32,17 @@ require (
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/odysseus/odysseus-protocol v0.0.0-00010101000000-000000000000
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/rivo/uniseg v0.4.4 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.48.0 // indirect
github.com/valyala/tcplisten v1.0.0 // indirect
golang.org/x/crypto v0.7.0 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/net v0.16.0 // indirect
golang.org/x/sys v0.13.0 // indirect
)
replace github.com/odysseus/odysseus-protocol => ../odysseus-protocol
......@@ -15,8 +15,11 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/gofiber/fiber/v2 v2.48.0 h1:cRVMCb9aUJDsyHxGFLwz/sGzDggdailZZyptU9F9cU0=
github.com/gofiber/fiber/v2 v2.48.0/go.mod h1:xqJgfqrc23FJuqGOW6DVgi3HyZEm2Mn9pRqUb2kHSX8=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
......@@ -40,6 +43,8 @@ github.com/jcmturner/gokrb5/v8 v8.4.3 h1:iTonLeSJOn7MVUtyMT+arAn5AKAPrkilzhGw8wE
github.com/jcmturner/gokrb5/v8 v8.4.3/go.mod h1:dqRwJGXznQrzw6cWmyo6kH+E7jksEQG/CyVWsJEsJO0=
github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY=
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
......@@ -70,31 +75,57 @@ github.com/valyala/fasthttp v1.48.0 h1:oJWvHb9BIZToTQS3MuQ2R3bJZiNSa2KiNdeI8A+79
github.com/valyala/fasthttp v1.48.0/go.mod h1:k2zXd82h/7UZc3VOdJ2WaUqt1uZ/XpXAfE9i+HBC3lA=
github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8=
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A=
golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU=
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM=
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/net v0.16.0 h1:7eBu7KsSvFDtSXUIDbh3aqlK4DPsZ1rByC8PFfBThos=
golang.org/x/net v0.16.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA=
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
......
package main
import (
"encoding/json"
"fmt"
"log"
"math/rand"
"sync"
"time"
"github.com/gofiber/fiber/v2"
"github.com/IBM/sarama"
"github.com/gofiber/fiber/v2"
// "github.com/gogo/protobuf/proto"
// "github.com/gogo/protobuf/types"
gogoPbProto "github.com/gogo/protobuf/proto"
//pbtypes "github.com/gogo/protobuf/types"
pbUpstream "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
//omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
)
var (
......@@ -18,19 +28,73 @@ var (
topic = "test_topic"
// Set an appropriate buffer size based on your requirements
maxBufferSize = 100
aigcTopic = "pbaigc"
)
// Global counter to keep track of the messages
var counter int
// Internal slice to store messages received from Kafka
var messages = make([]string, 0, maxBufferSize)
// Mutex to synchronize access to the messages slice
var mutex = sync.Mutex{}
// Channel to send messages from the /producer endpoint to the Kafka producer goroutine
var producerMessages = make(chan string)
// Channel to send messages from the Kafka consumer to the /consumer endpoint
type bytesAndHeader struct {
Bytes []byte
HttpHeader map[string]string
}
var producerMessagesBytes = make(chan bytesAndHeader, 1000)
//var TaskUUIDInstream = make(chan string, 1000)
// Channel to send messages from the Kafka consumer to the /consumer endpoint
var consumerMessages = make(chan string)
func kafkaProducerBytes() {
// Create a new Sarama configuration for the Kafka producer.
config := sarama.NewConfig()
// Create a new Kafka producer using the specified configuration and broker addresses.
producer, err := sarama.NewAsyncProducer(kafkaBrokers, config)
if err != nil {
log.Fatal("Failed to start Kafka producer:", err)
}
// Ensure the Kafka producer is closed when the function ends (deferred execution).
defer producer.Close()
for message := range producerMessagesBytes {
header := make([]sarama.RecordHeader, 0, 1)
for k, v := range message.HttpHeader {
header = append(header, sarama.RecordHeader{
Key: []byte(k),
Value: []byte(v),
})
}
v := append(sarama.ByteEncoder{}, message.Bytes...)
//sarama.ByteEncoder =
producer.Input() <- &sarama.ProducerMessage{
Topic: aigcTopic,
Value: v,
Headers: header,
}
// Introduce random delay between 1 to 3 seconds for message push
//time.Sleep(time.Duration(1+rand.Intn(3)) * time.Second)
}
}
func kafkaProducer() {
// Create a new Sarama configuration for the Kafka producer.
......@@ -70,6 +134,61 @@ func kafkaProducer() {
}
}
func kafkaConsumerBytes(wg *sync.WaitGroup) {
// Create a new Sarama configuration for the Kafka producer.
config := sarama.NewConfig()
// Create a new Kafka consumer using the specified configuration and broker addresses.
consumer, err := sarama.NewConsumer(kafkaBrokers, config)
if err != nil {
log.Fatal("Failed to start Kafka consumer:", err)
}
// Ensure the Kafka consumer is closed when the function ends (deferred execution).
defer consumer.Close()
// Create a partition consumer for the specified topic, partition, and starting offset.
// The starting offset is set to sarama.OffsetNewest, which means the consumer will start consuming messages from the latest available offset.
partitionConsumer, err := consumer.ConsumePartition(aigcTopic, 0, sarama.OffsetNewest)
if err != nil {
log.Fatal("Failed to start partition consumer:", err)
}
// Ensure the partition consumer is closed when the function ends (deferred execution).
defer partitionConsumer.Close()
// Signal that the consumer goroutine is ready
wg.Done()
// Infinite loop to continuously listen for messages from the partitionConsumer.Messages() channel.
for {
select {
case message := <-partitionConsumer.Messages():
jsonMsg, err := json.Marshal(message)
if err != nil {
fmt.Println("consumer error", err.Error())
continue
}
//baseapi.
value := string(jsonMsg)
fmt.Printf("Received message from Kafka: %s\n", value)
// Acquire the mutex before appending to the messages slice to ensure concurrent-safe access.
mutex.Lock()
// Append the received message to the internal messages slice.
messages = append(messages, value)
// Release the mutex.
mutex.Unlock()
// Send the received message to the /consumer endpoint via the consumerMessages channel.
consumerMessages <- value
}
}
}
func kafkaConsumer(wg *sync.WaitGroup) {
// Create a new Sarama configuration for the Kafka producer.
......@@ -95,7 +214,7 @@ func kafkaConsumer(wg *sync.WaitGroup) {
defer partitionConsumer.Close()
// Signal that the consumer goroutine is ready
wg.Done()
defer wg.Done()
// Infinite loop to continuously listen for messages from the partitionConsumer.Messages() channel.
for {
......@@ -115,6 +234,37 @@ func kafkaConsumer(wg *sync.WaitGroup) {
}
}
// RoundStepType enumerates the state of the consensus state machine
type RoundStepType uint8 // These must be numeric, ordered.
// RoundStepType
const (
ChatCompletionsType = "ChatCompletions"
ImagesGenerationsType = "ImagesGenerations"
// ImagesVariations = "ImagesVariations"
// ImagesEdits = "ImagesEdits"
)
const (
ChatCompletionsFee = "10"
ImagesGenerationsFee = "20"
)
//POST
//https://api.openai.com/v1/chat/completions
// PrevoteType SignedMsgType = 0x01
// PrecommitType SignedMsgType = 0x02
// ProposalType SignedMsgType = 0x20
type ResponseJson struct {
TaskId string
TaskResult []byte
TaskUid string
TaskFee string
}
func main() {
// Create a new instance of the Fiber web framework.
app := fiber.New()
......@@ -127,12 +277,154 @@ func main() {
// Launch the Kafka producer goroutine in the background.
go kafkaProducer()
go kafkaProducerBytes()
// Launch the Kafka consumer goroutine in the background, passing the WaitGroup for synchronization.
go kafkaConsumer(wg)
go kafkaConsumerBytes(wg)
// Wait for the consumer goroutine to be ready
wg.Wait()
app.Post("/chat/completions", func(c *fiber.Ctx) error {
body := c.Body()
fmt.Println("body", string(body))
reqHeaders := c.GetReqHeaders()
// fmt.Println("reqHeaders[\"Task-Id\"]", reqHeaders["Task-Id"])
//prefix := fmt.Sprintf("%02x", ImagesGenerations)
// _ =pbUpstream
pbMsg := pbUpstream.TaskContent{
TaskId: reqHeaders["Task-Id"],
TaskType: ChatCompletionsType,
TaskCmd: "not provide",
TaskParam: body,
TaskTimestamp: uint64(time.Now().UnixMilli()),
TaskCallback: "http://192.168.1.220:6000/callback",
TaskUid: reqHeaders["X-Consumer-Custom-Id"],
TaskFee: ChatCompletionsFee,
}
pbBytes, err := gogoPbProto.Marshal(&pbMsg)
if err != nil {
fmt.Println("pb error", err.Error())
return c.SendString(fmt.Sprintf("pb error: %v", err.Error()))
}
// res := make([]byte, 0, len(prefix)+len(reqHeaders["Task-Id"])+len(body))
// res = append(res, []byte(prefix)...)
// res = append(res, reqHeaders["Task-Id"]...)
// res = append(res, body...)
producerMessagesBytes <- bytesAndHeader{
Bytes: pbBytes,
HttpHeader: reqHeaders,
}
wait := req(pbMsg.TaskId)
resAsPb := <-wait
resAsJson := ResponseJson{
TaskId: resAsPb.TaskId,
TaskResult: resAsPb.TaskResult,
TaskUid: resAsPb.TaskUid,
TaskFee: resAsPb.TaskFee,
}
return c.JSON(resAsJson)
//return c.SendString("Message sent to Kafka producer.")
})
app.Post("/images/generations", func(c *fiber.Ctx) error {
body := c.Body()
fmt.Println("body", string(body))
reqHeaders := c.GetReqHeaders()
fmt.Println("reqHeaders[\"Task-Id\"]", reqHeaders["Task-Id"])
//prefix := fmt.Sprintf("%02x", ImagesGenerations)
// _ =pbUpstream
pbMsg := pbUpstream.TaskContent{
TaskId: reqHeaders["Task-Id"],
TaskType: ImagesGenerationsType,
TaskCmd: "not provide",
TaskParam: body,
TaskTimestamp: uint64(time.Now().UnixMilli()),
TaskCallback: "http://192.168.1.220:6000/callback",
TaskUid: reqHeaders["X-Consumer-Custom-Id"],
TaskFee: ImagesGenerationsFee,
}
pbBytes, err := gogoPbProto.Marshal(&pbMsg)
if err != nil {
fmt.Println("pb error", err.Error())
return c.SendString(fmt.Sprintf("pb error: %v", err.Error()))
}
// res := make([]byte, 0, len(prefix)+len(reqHeaders["Task-Id"])+len(body))
// res = append(res, []byte(prefix)...)
// res = append(res, reqHeaders["Task-Id"]...)
// res = append(res, body...)
producerMessagesBytes <- bytesAndHeader{
Bytes: pbBytes,
HttpHeader: reqHeaders,
}
wait := req(pbMsg.TaskId)
resAsPb := <-wait
resAsJson := ResponseJson{
TaskId: resAsPb.TaskId,
TaskResult: resAsPb.TaskResult,
TaskUid: resAsPb.TaskUid,
TaskFee: resAsPb.TaskFee,
}
return c.JSON(resAsJson)
//return c.SendStatus(200)
//return c.SendString("Message sent to Kafka producer.")
})
app.Post("/images/variations", func(c *fiber.Ctx) error {
body := c.Body()
fmt.Println("body", string(body))
return c.SendStatus(200)
// message := c.Params("message")
// // Sending message to the Kafka producer via the producerMessages channel
// producerMessages <- message
// return c.SendString("Message sent to Kafka producer.")
})
app.Post("/images/edits", func(c *fiber.Ctx) error {
body := c.Body()
fmt.Println("body", string(body))
return c.SendStatus(200)
// message := c.Params("message")
// // Sending message to the Kafka producer via the producerMessages channel
// producerMessages <- message
// return c.SendString("Message sent to Kafka producer.")
})
// The /producer endpoint for sending messages to the Kafka producer
app.Get("/producer/:message", func(c *fiber.Ctx) error {
message := c.Params("message")
......@@ -143,6 +435,7 @@ func main() {
// The /consumer endpoint for receiving messages from the Kafka consumer
app.Get("/consumer", func(c *fiber.Ctx) error {
select {
case msg := <-consumerMessages:
// If a message is available in the consumerMessages channel, return it as the response.
......@@ -153,5 +446,26 @@ func main() {
}
})
log.Fatal(app.Listen(":3000"))
app.Post("/callback", func(c *fiber.Ctx) error {
body := c.Body()
fmt.Println("body", string(body))
var resbody pbUpstream.TaskResponse
if err := gogoPbProto.Unmarshal(body, &resbody); err != nil {
return c.SendString(fmt.Sprintf("Unmarshal error %v", err.Error()))
}
res(resbody)
return c.SendStatus(200)
// message := c.Params("message")
// // Sending message to the Kafka producer via the producerMessages channel
// producerMessages <- message
// return c.SendString("Message sent to Kafka producer.")
})
log.Fatal(app.Listen("0.0.0.0:6000"))
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment