Commit cc5ef839 authored by Cloud User's avatar Cloud User

Revert "add error msg"

This reverts commit e98d09c0.
parent e98d09c0
cache @ dd268a4a
Subproject commit dd268a4a9e7c33d4551b961c665ca7cff69ab1a3
...@@ -10,22 +10,16 @@ import ( ...@@ -10,22 +10,16 @@ import (
) )
type apiQueryTxsForAddr struct { type apiQueryTxsForAddr struct {
uuid string uuid string
//res chan pbUpstream.TaskResponse res chan pbUpstream.TaskResponse
res chan ResWithError
async bool async bool
} }
type ResWithError struct {
res pbUpstream.TaskResponse
err error
}
var ApiQueryTxsByAddrForQueue = make(chan apiQueryTxsForAddr, 1000) var ApiQueryTxsByAddrForQueue = make(chan apiQueryTxsForAddr, 1000)
func syncReq(uuid string) chan ResWithError { func syncReq(uuid string) chan pbUpstream.TaskResponse {
res := make(chan ResWithError) res := make(chan pbUpstream.TaskResponse)
ApiQueryTxsByAddrForQueue <- apiQueryTxsForAddr{ ApiQueryTxsByAddrForQueue <- apiQueryTxsForAddr{
uuid: uuid, uuid: uuid,
...@@ -47,24 +41,23 @@ func asyncReq(uuid string) { ...@@ -47,24 +41,23 @@ func asyncReq(uuid string) {
} }
// func callbackRes(res pbUpstream.TaskResponse) { func callbackRes(res pbUpstream.TaskResponse) {
func callbackRes(res ResWithError) {
if v, ok := recordmap.LoadAndDelete(res.res.TaskId); ok { if v, ok := recordmap.LoadAndDelete(res.TaskId); ok {
//resAsV, ok := v.(chan pbUpstream.TaskResponse) //resAsV, ok := v.(chan pbUpstream.TaskResponse)
resAsV, ok := v.(apiQueryTxsForAddr) resAsV, ok := v.(apiQueryTxsForAddr)
if ok { if ok {
if !resAsV.async { if !resAsV.async {
resAsV.res <- res resAsV.res <- res
} else { } else {
asyncmap.Add(res.res.TaskId, res) asyncmap.Add(res.TaskId, res)
//asyncmap.Store(res.TaskId, res) //asyncmap.Store(res.TaskId, res)
} }
} }
} }
} }
func getAsyncRes(uuid string) (ResWithError, bool) { func getAsyncRes(uuid string) (pbUpstream.TaskResponse, bool) {
return asyncmap.Get(uuid) return asyncmap.Get(uuid)
...@@ -79,11 +72,11 @@ func getAsyncRes(uuid string) (ResWithError, bool) { ...@@ -79,11 +72,11 @@ func getAsyncRes(uuid string) (ResWithError, bool) {
var recordmap sync.Map var recordmap sync.Map
var asyncmap *expirable.LRU[string, ResWithError] var asyncmap *expirable.LRU[string, pbUpstream.TaskResponse]
func init() { func init() {
asyncmap = expirable.NewLRU[string, ResWithError](10000000, nil, time.Hour*1) asyncmap = expirable.NewLRU[string, pbUpstream.TaskResponse](10000000, nil, time.Hour*1)
} }
func recordUUID() { func recordUUID() {
......
...@@ -49,8 +49,6 @@ RUN cat ./go-kafka/id_rsa > /root/.ssh/id_rsa && \ ...@@ -49,8 +49,6 @@ RUN cat ./go-kafka/id_rsa > /root/.ssh/id_rsa && \
#RUN go build -o /go-kafka #RUN go build -o /go-kafka
RUN cd go-kafka && go mod tidy && make build-docker-ouput RUN cd go-kafka && go mod tidy && make build-docker-ouput
COPY ./docs/ /docs/
#go build -o /ai-api-mgr #go build -o /ai-api-mgr
...@@ -59,7 +57,6 @@ FROM alpine ...@@ -59,7 +57,6 @@ FROM alpine
WORKDIR /root WORKDIR /root
COPY --from=build /ai-api-mgr /usr/bin/ai-api-mgr COPY --from=build /ai-api-mgr /usr/bin/ai-api-mgr
COPY --from=build /docs /usr/bin/docs/
#ENTRYPOINT [ "/usr/bin/ai-api-mgr" ] #ENTRYPOINT [ "/usr/bin/ai-api-mgr" ]
......
swagger: '2.0'
info: info:
title: feature.proto contact: {}
version: version not set paths: {}
schemes: swagger: "2.0"
- http
- https
consumes:
- application/json
produces:
- application/json
#host: 192.168.1.220:8080
host: api.aigic.ai
#URL: "https://api.aigic.ai/swagger.yaml",
# 1) Define the security scheme type (HTTP bearer)
components:
securitySchemes:
bearerAuth: # arbitrary name for the security scheme
type: http
scheme: bearer
bearerFormat: JWT # optional, arbitrary value for documentation purposes
name: authorization
ApiKeyAuth: # arbitrary name for the security scheme
type: apiKey
in: header # can be "header", "query" or "cookie"
name: apiKey # name of the header, query parameter or cookie
paths:
/query/v1/{taskid}:
get:
summary: get /query/v1/{taskid}
responses:
'200':
description: ''
schema:
$ref: '#/definitions/normalResponseJsonObject'
parameters:
- name: taskid
in: path
required: true
type: string
format: uuid
/api/v1/{subPath}:
post:
security:
- ApiKeyAuth: []
summary: post /api/v1/txt2img/sd-1.5/base
responses:
'200':
description: ''
schema:
$ref: '#/definitions/normalResponseJsonObject'
parameters:
- name: subPath
in: path
required: true
type: string
format: url
- in: header
name: apikey
type: string
required: true
/api/v1/{subPath} async:
post:
security:
- ApiKeyAuth: []
summary: post /api/v1/txt2img/sd-1.5/base async
responses:
'200':
description: ''
schema:
$ref: '#/definitions/syncResponseObject'
parameters:
- name: subPath
in: path
required: true
type: string
format: url
- in: header
name: apikey
type: string
required: true
- in: header
name: Prefer
type: string
default: respond-async
required: true
/jwt/v1/{subPath}:
post:
security:
- bearerAuth: []
summary: post /jwt/v1/txt2img/sd-1.5/base
responses:
'200':
description: ''
schema:
$ref: '#/definitions/normalResponseJsonObject'
parameters:
- name: subPath
in: path
required: true
type: string
format: url
- in: header
name: authorization
type: string
required: true
/jwt/v1/{subPath} async:
post:
security:
- bearerAuth: []
summary: post /jwt/v1/txt2img/sd-1.5/base
responses:
'200':
description: ''
schema:
$ref: '#/definitions/syncResponseObject'
parameters:
- name: subPath
in: path
required: true
type: string
format: url
- in: header
name: authorization
type: string
required: true
- in: header
name: Prefer
type: string
default: respond-async
required: true
/callback/v1/:
post:
summary: post /callback/v1/
responses:
'200':
description: ''
requestBody:
required: true
schema:
$ref: '#/definitions/callbackReq'
definitions:
syncResponseObject:
type: object
properties:
task_id:
type: string
format: uuid
request_id:
type: string
estimate_exec_time:
type: integer
max_exec_time:
type: integer
normalResponseJsonObject:
type: object
properties:
output:
$ref: '#/definitions/output'
task:
$ref: '#/definitions/ResponseTask'
output:
type: object
description: |-
`output JSON object Struct` represents a structured data value for upstream
ResponseTask:
type: object
properties:
task_id:
type: string
format: uuid
task_uid:
type: integer
task_fee:
type: string
is_success:
type: boolean
task_error:
type: string
exec_code:
type: string
exec_error:
type: string
api_error:
type: string
enum:[
- "No + TaskIdKey + key found in request header"
- "No + RequestIdKey + " key found in request header"
- "No + TaskIdKey + value found in request header"
- "No + RequestIdKey + " value found in request header"
- "No + UIDkey + key found in request header"
- "No + UIDkey + value found in request header"
- "Uid value strconv atoi error is: \%s"
- "Query db error is: \%s"
- "Json marshal db content error is: \%s"
- "Can not find out the task id %s in result cache.""
- "Msg to kafka pb marshal error is: \%s"
- "sync call back error is: \%s"
- "sync response json marshal error is: \%s"
- "Callback proto unmarshal http body error is: \%s"
- "callback errror is: %s"
- "Http header in callback body marshal error is: %s"
- "Sync Or Async Return Json Unmarshal Error is: %s"
- "Task Return Json Marshal Error is: %s"
]
callbackReq:
protobufValue:
type: object
properties:
TaskId:
type: string
format: uuid
description: Represents task id.
TaskResultHeader:
type: string
format: byte
description: Represents a upstream http header in callback http body.
TaskResultBody:
type: string
format: byte
description: Represents a upstream http body in callback http body.
TaskUid:
type: string
TaskFee:
type: string
TaskIsSucceed:
type: boolean
format: boolean
TaskError:
type: string
format: string
TaskResultCode:
type: integer
TaskExecuteError:
type: string
format: string
\ No newline at end of file
...@@ -42,7 +42,6 @@ require ( ...@@ -42,7 +42,6 @@ require (
github.com/go-openapi/spec v0.20.8 // indirect github.com/go-openapi/spec v0.20.8 // indirect
github.com/go-openapi/swag v0.22.3 // indirect github.com/go-openapi/swag v0.22.3 // indirect
github.com/go-sql-driver/mysql v1.7.1 // indirect github.com/go-sql-driver/mysql v1.7.1 // indirect
github.com/gofiber/swagger v1.0.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect github.com/golang/snappy v0.0.4 // indirect
github.com/gomodule/redigo v2.0.0+incompatible // indirect github.com/gomodule/redigo v2.0.0+incompatible // indirect
...@@ -91,7 +90,6 @@ require ( ...@@ -91,7 +90,6 @@ require (
github.com/samber/slog-loki/v3 v3.2.1 // indirect github.com/samber/slog-loki/v3 v3.2.1 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect github.com/sirupsen/logrus v1.9.3 // indirect
github.com/stretchr/testify v1.8.4 // indirect github.com/stretchr/testify v1.8.4 // indirect
github.com/swaggo/files/v2 v2.0.0 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.51.0 // indirect github.com/valyala/fasthttp v1.51.0 // indirect
github.com/valyala/tcplisten v1.0.0 // indirect github.com/valyala/tcplisten v1.0.0 // indirect
......
...@@ -569,8 +569,6 @@ github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5x ...@@ -569,8 +569,6 @@ github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5x
github.com/godbus/dbus/v5 v5.0.6/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.0.6/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofiber/fiber/v2 v2.52.0 h1:S+qXi7y+/Pgvqq4DrSmREGiFwtB7Bu6+QFLuIHYw/UE= github.com/gofiber/fiber/v2 v2.52.0 h1:S+qXi7y+/Pgvqq4DrSmREGiFwtB7Bu6+QFLuIHYw/UE=
github.com/gofiber/fiber/v2 v2.52.0/go.mod h1:KEOE+cXMhXG0zHc9d8+E38hoX+ZN7bhOtgeF2oT6jrQ= github.com/gofiber/fiber/v2 v2.52.0/go.mod h1:KEOE+cXMhXG0zHc9d8+E38hoX+ZN7bhOtgeF2oT6jrQ=
github.com/gofiber/swagger v1.0.0 h1:BzUzDS9ZT6fDUa692kxmfOjc1DZiloLiPK/W5z1H1tc=
github.com/gofiber/swagger v1.0.0/go.mod h1:QrYNF1Yrc7ggGK6ATsJ6yfH/8Zi5bu9lA7wB8TmCecg=
github.com/gofrs/uuid v4.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gofrs/uuid v4.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
github.com/gogo/googleapis v1.2.0/go.mod h1:Njal3psf3qN6dwBtQfUmBZh2ybovJ0tlu3o/AC7HYjU= github.com/gogo/googleapis v1.2.0/go.mod h1:Njal3psf3qN6dwBtQfUmBZh2ybovJ0tlu3o/AC7HYjU=
...@@ -1239,8 +1237,6 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o ...@@ -1239,8 +1237,6 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/swaggo/files/v2 v2.0.0 h1:hmAt8Dkynw7Ssz46F6pn8ok6YmGZqHSVLZ+HQM7i0kw=
github.com/swaggo/files/v2 v2.0.0/go.mod h1:24kk2Y9NYEJ5lHuCra6iVwkMjIekMCaFq/0JQj66kyM=
github.com/swaggo/swag v1.16.3 h1:PnCYjPCah8FK4I26l2F/KQ4yz3sILcVUN3cTlBFA9Pg= github.com/swaggo/swag v1.16.3 h1:PnCYjPCah8FK4I26l2F/KQ4yz3sILcVUN3cTlBFA9Pg=
github.com/swaggo/swag v1.16.3/go.mod h1:DImHIuOFXKpMFAQjcC7FG4m3Dg4+QuUgUzJmKjI/gRk= github.com/swaggo/swag v1.16.3/go.mod h1:DImHIuOFXKpMFAQjcC7FG4m3Dg4+QuUgUzJmKjI/gRk=
github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
......
...@@ -34,20 +34,10 @@ import ( ...@@ -34,20 +34,10 @@ import (
"github.com/grafana/loki-client-go/loki" "github.com/grafana/loki-client-go/loki"
slogloki "github.com/samber/slog-loki/v3" slogloki "github.com/samber/slog-loki/v3"
"github.com/gofiber/fiber/v2/middleware/cors"
// "github.com/gofiber/fiber/v2" // "github.com/gofiber/fiber/v2"
// "github.com/gofiber/fiber/v2/middleware/recover" // "github.com/gofiber/fiber/v2/middleware/recover"
// slogfiber "github.com/samber/slog-fiber" // slogfiber "github.com/samber/slog-fiber"
// "log/slog" // "log/slog"
_ "github.com/odysseus/go-kafka/docs"
"github.com/gofiber/swagger"
// docs are generated by Swag CLI, you have to import them.
// replace with your own docs folder, usually "github.com/username/reponame/docs"
//_ "github.com/gofiber/swagger/example/docs"
) )
var ( var (
...@@ -88,8 +78,8 @@ var producerMessagesBytes = make(chan bytesAndHeader, 1000) ...@@ -88,8 +78,8 @@ var producerMessagesBytes = make(chan bytesAndHeader, 1000)
// Channel to send messages from the Kafka consumer to the /consumer endpoint // Channel to send messages from the Kafka consumer to the /consumer endpoint
var consumerMessages = make(chan string) var consumerMessages = make(chan string)
// const TaskIdAtrr = "Task-Id" const TaskIdAtrr = "Task-Id"
// const RequestId = "X-Kong-Request-Id" const RequestId = "X-Kong-Request-Id"
func kafkaProducerBytes() { func kafkaProducerBytes() {
...@@ -211,7 +201,7 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic, proofTopic string) ...@@ -211,7 +201,7 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic, proofTopic string)
} }
baseAttributes := []slog.Attr{} baseAttributes := []slog.Attr{}
baseAttributes = append(baseAttributes, slog.String(TaskIdKey, pbMsg.TaskId)) baseAttributes = append(baseAttributes, slog.String(TaskIdAtrr, pbMsg.TaskId))
slog.LogAttrs(context.Background(), slog.LevelInfo, "<- kafka consumer", append(baseAttributes, slog.String("topic", req))...) slog.LogAttrs(context.Background(), slog.LevelInfo, "<- kafka consumer", append(baseAttributes, slog.String("topic", req))...)
select { select {
...@@ -317,7 +307,7 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic, proofTopic string) ...@@ -317,7 +307,7 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic, proofTopic string)
} }
baseAttributes := []slog.Attr{} baseAttributes := []slog.Attr{}
baseAttributes = append(baseAttributes, slog.String(TaskIdKey, pbMsg.TaskId)) baseAttributes = append(baseAttributes, slog.String(TaskIdAtrr, pbMsg.TaskId))
slog.LogAttrs(context.Background(), slog.LevelInfo, "<- kafka consumer", append(baseAttributes, slog.String("topic", resTopic))...) slog.LogAttrs(context.Background(), slog.LevelInfo, "<- kafka consumer", append(baseAttributes, slog.String("topic", resTopic))...)
//slog.Info("kafka consumer", "topic", resTopic, "pbMsg.TaskId", pbMsg.TaskId) //slog.Info("kafka consumer", "topic", resTopic, "pbMsg.TaskId", pbMsg.TaskId)
...@@ -427,7 +417,7 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic, proofTopic string) ...@@ -427,7 +417,7 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic, proofTopic string)
} }
baseAttributes := []slog.Attr{} baseAttributes := []slog.Attr{}
baseAttributes = append(baseAttributes, slog.String(TaskIdKey, pbMsg.TaskId)) baseAttributes = append(baseAttributes, slog.String(TaskIdAtrr, pbMsg.TaskId))
slog.LogAttrs(context.Background(), slog.LevelInfo, "<- kafka consumer", append(baseAttributes, slog.String("topic", proofTopic))...) slog.LogAttrs(context.Background(), slog.LevelInfo, "<- kafka consumer", append(baseAttributes, slog.String("topic", proofTopic))...)
//slog.Info("kafka consumer", "topic", resTopic, "pbMsg.TaskId", pbMsg.TaskId) //slog.Info("kafka consumer", "topic", resTopic, "pbMsg.TaskId", pbMsg.TaskId)
...@@ -478,7 +468,7 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent ...@@ -478,7 +468,7 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
seconds := nanoseconds / 1e9 seconds := nanoseconds / 1e9
baseAttributes := []slog.Attr{} baseAttributes := []slog.Attr{}
baseAttributes = append(baseAttributes, slog.String(TaskIdKey, task.TaskId)) baseAttributes = append(baseAttributes, slog.String(TaskIdAtrr, task.TaskId))
slog.LogAttrs(context.Background(), slog.LevelDebug, "questdb <- resStream", append(baseAttributes, slog.String("TaskUid", task.TaskUid))...) slog.LogAttrs(context.Background(), slog.LevelDebug, "questdb <- resStream", append(baseAttributes, slog.String("TaskUid", task.TaskUid))...)
for { for {
...@@ -521,7 +511,7 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent ...@@ -521,7 +511,7 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
} }
baseAttributes := []slog.Attr{} baseAttributes := []slog.Attr{}
baseAttributes = append(baseAttributes, slog.String(TaskIdKey, task.TaskId)) baseAttributes = append(baseAttributes, slog.String(TaskIdAtrr, task.TaskId))
slog.LogAttrs(context.Background(), slog.LevelDebug, "questdb <- reqStream", append(baseAttributes, slog.String("task.TaskUid", task.TaskUid))...) slog.LogAttrs(context.Background(), slog.LevelDebug, "questdb <- reqStream", append(baseAttributes, slog.String("task.TaskUid", task.TaskUid))...)
//slog.Debug("questdb <- reqStream", "task.TaskId", task.TaskId, "task.TaskUid", task.TaskUid) //slog.Debug("questdb <- reqStream", "task.TaskId", task.TaskId, "task.TaskUid", task.TaskUid)
...@@ -807,7 +797,7 @@ func main() { ...@@ -807,7 +797,7 @@ func main() {
} }
baseAttributes := []slog.Attr{} baseAttributes := []slog.Attr{}
baseAttributes = append(baseAttributes, slog.String(TaskIdKey, "------------------------------")) baseAttributes = append(baseAttributes, slog.String(TaskIdAtrr, "------------------------------"))
baseAttributes = append(baseAttributes, slog.String("callback", callbackAddrP)) baseAttributes = append(baseAttributes, slog.String("callback", callbackAddrP))
baseAttributes = append(baseAttributes, slog.String("listenIpPort", listenIpPort)) baseAttributes = append(baseAttributes, slog.String("listenIpPort", listenIpPort))
baseAttributes = append(baseAttributes, slog.Bool("withBillDB", withBillDb)) baseAttributes = append(baseAttributes, slog.Bool("withBillDB", withBillDb))
...@@ -897,20 +887,29 @@ func main() { ...@@ -897,20 +887,29 @@ func main() {
// Create a new instance of the Fiber web framework. // Create a new instance of the Fiber web framework.
app := fiber.New() app := fiber.New()
app.Use(cors.New()) // cfg := slogfiber.Config{
// DefaultLevel: slog.LevelInfo,
app.Static("/", "./docs") // ClientErrorLevel: slog.LevelWarn,
// ServerErrorLevel: slog.LevelError,
//app.Get("/swagger/*", swagger.HandlerDefault) // default
// WithUserAgent: false,
// WithRequestID: false,
// WithRequestBody: false,
// WithRequestHeader: true,
// WithResponseBody: false,
// WithResponseHeader: false,
// WithSpanID: false,
// WithTraceID: false,
// //Filters: []Filter{},
// }
//_ = cfg
app.Get("/swagger/220/*", swagger.New(swagger.Config{ // custom //app := fiber.New()
URL: "http://192.168.1.220:8080/swagger.yaml", //app.Use(slogfiber.NewWithConfig(slog.Default(), cfg))
}))
app.Get("/swagger/test/*", swagger.New(swagger.Config{ // custom //app.Use(slogfiber.New(slog.Default()))
URL: "https://api.aigic.ai/swagger.yaml", //app.Use(recover.New())
//URL: "http://192.168.1.220:8080/swagger.yaml",
}))
app.Use(slogfiber.New(slog.Default())) app.Use(slogfiber.New(slog.Default()))
app.Use(recover.New()) app.Use(recover.New())
...@@ -922,12 +921,77 @@ func main() { ...@@ -922,12 +921,77 @@ func main() {
apiGroupV1.Post("/*", ApiOrJWT) apiGroupV1.Post("/*", ApiOrJWT)
jwtGroupV1 := jwtGroup.Group("/v1") jwtGroupV1 := jwtGroup.Group("/v1")
//jwtGroupV1.Post("/*", slogfiber.New(slog.Default()), ApiOrJWT)
jwtGroupV1.Post("/*", ApiOrJWT) jwtGroupV1.Post("/*", ApiOrJWT)
//curl -X GET http://127.0.0.1:4000/api/jobs?id=e45b5ebc-c71e-4ab8-b10f-d1202e7fb16e //curl -X GET http://127.0.0.1:4000/api/jobs?id=e45b5ebc-c71e-4ab8-b10f-d1202e7fb16e
//slogfiber.NewWithConfig(slog.Default(), cfg) //slogfiber.NewWithConfig(slog.Default(), cfg)
queryGroup := app.Group("/query")
queryGroupV1 := queryGroup.Group("/v1")
queryGroupV1.Get("/:taskId", func(c *fiber.Ctx) error {
taskId := c.Params("taskId")
slogfiber.AddCustomAttributes(c, slog.String(TaskIdAtrr, taskId))
queryBaseAttributes := []slog.Attr{}
queryBaseAttributes = append(queryBaseAttributes, slog.String(TaskIdAtrr, taskId))
reqHeaders := c.GetReqHeaders()
// if taskHeaders, ok := reqHeaders[TaskIdAtrr]; ok {
// if taskHeaders == nil || len(taskHeaders) == 0 {
// return ApiErrorF(c, fmt.Sprintf(fmt.Sprintf("Please provide http header %s value, right now the value is %v.", TaskIdAtrr, taskHeaders)), queryBaseAttributes)
// }
// } else {
// return ApiErrorF(c, fmt.Sprintf("Please provide http header %s .", TaskIdAtrr), queryBaseAttributes)
// }
if RequestIdHeaders, ok := reqHeaders[RequestId]; ok {
if RequestIdHeaders == nil || len(RequestIdHeaders) == 0 {
return ApiErrorF(c, fmt.Sprintf(fmt.Sprintf("Please provide http header %s value, right now the value is %v.", RequestId, RequestIdHeaders)), queryBaseAttributes)
}
} else {
return ApiErrorF(c, fmt.Sprintf("Please provide http header %s .", RequestId), queryBaseAttributes)
}
slogfiber.AddCustomAttributes(c, slog.String(RequestId, reqHeaders[RequestId][0]))
// baseAttributes := []slog.Attr{}
// baseAttributes = append(baseAttributes, slog.String(TaskIdAtrr, taskId))
newAttributes := append(queryBaseAttributes, slog.String("c.Path()", c.Path()), slog.String("c.Route().Path", c.Route().Path))
slog.LogAttrs(c.UserContext(), slog.LevelInfo, "http api query task", newAttributes...)
if len(taskId) == 0 {
return ApiErrorF(c, fmt.Sprintf("%s must provide task id param for route %s ; e.g. http://127.0.0.1/query/v1/e45b5ebc-c71e-4ab8-b10f-d1202e7fb16e ", c.Path(), c.Route().Path), newAttributes)
}
if res, ok := getAsyncRes(taskId); ok {
newAttributes := append(queryBaseAttributes, slog.Int("resAsPb.TaskResultCode", int(res.TaskResultCode)))
newAttributes = append(newAttributes, slog.String("TaskResultHeader", string(res.TaskResultHeader)))
if isBase64(res.TaskResultBody) {
newAttributes = append(newAttributes, slog.Int("TaskResultBody as Base64", len(res.TaskResultBody)))
} else {
newAttributes = append(newAttributes, slog.String("TaskResultBody is not Base64", string(res.TaskResultBody)))
}
slog.LogAttrs(c.UserContext(), slog.LevelInfo, "getAsyncRes", newAttributes...)
return syncOrAsyncReturn(c, res, taskId, baseAttributes)
}
return ApiErrorF(c, fmt.Sprintf("can not find out the task id %s in result cache.", taskId), attributes)
})
cfg := slogfiber.Config{ cfg := slogfiber.Config{
WithUserAgent: true, WithUserAgent: true,
WithRequestID: true, WithRequestID: true,
...@@ -947,90 +1011,28 @@ func main() { ...@@ -947,90 +1011,28 @@ func main() {
callbackGroupV1 := callbackGroup.Group("/v1") callbackGroupV1 := callbackGroup.Group("/v1")
callbackGroupV1.Post("/", slogfiber.NewWithConfig(slog.Default(), cfg), func(c *fiber.Ctx) error { callbackGroupV1.Post("/", slogfiber.NewWithConfig(slog.Default(), cfg), func(c *fiber.Ctx) error {
callbackBaseAttributes := []slog.Attr{} slog.LogAttrs(c.UserContext(), slog.LevelInfo, "callback", slog.String("path", c.Route().Path))
callbackBaseAttributes = append(callbackBaseAttributes, slog.String("path", c.Route().Path))
slog.LogAttrs(c.UserContext(), slog.LevelInfo, "callback", callbackBaseAttributes...)
body := c.Body() body := c.Body()
var resbody pbUpstream.TaskResponse var resbody pbUpstream.TaskResponse
if err := gogoPbProto.Unmarshal(body, &resbody); err != nil { if err := gogoPbProto.Unmarshal(body, &resbody); err != nil {
//CallbackPbUnmarshalError return ApiErrorF(c, fmt.Sprintf("callback Unmarshal error %v", err.Error()), attributes)
callbackRes(ResWithError{err: err})
return ApiErrorF(c, NewError(fmt.Sprintf(CallbackPbUnmarshalError, err.Error()), ""), callbackBaseAttributes)
//return ApiErrorF(c, fmt.Sprintf("callback Unmarshal error %v", err.Error()), attributes)
} }
slogfiber.AddCustomAttributes(c, slog.String(TaskIdKey, resbody.TaskId)) slogfiber.AddCustomAttributes(c, slog.String(TaskIdAtrr, resbody.TaskId))
//baseAttributes := []slog.Attr{} baseAttributes := []slog.Attr{}
callbackBaseAttributes = append(callbackBaseAttributes, slog.String(TaskIdKey, resbody.TaskId)) baseAttributes = append(baseAttributes, slog.String(TaskIdAtrr, resbody.TaskId))
slog.LogAttrs(c.UserContext(), slog.LevelDebug, "callback", append(callbackBaseAttributes, slog.Bool("TaskIsSucceed", resbody.TaskIsSucceed), slog.String(TaskIdKey, resbody.TaskId), slog.String("TaskUid", resbody.TaskUid))...) slog.LogAttrs(c.UserContext(), slog.LevelDebug, "callback", append(baseAttributes, slog.Bool("TaskIsSucceed", resbody.TaskIsSucceed), slog.String(TaskIdAtrr, resbody.TaskId), slog.String("TaskUid", resbody.TaskUid))...)
callbackRes(ResWithError{err: nil, res: resbody}) callbackRes(resbody)
return c.SendStatus(200) return c.SendStatus(200)
}) })
queryGroup := app.Group("/query")
queryGroupV1 := queryGroup.Group("/v1")
queryGroupV1.Get("/", func(c *fiber.Ctx) error {
//queryGroupV1.Get("/:taskId", func(c *fiber.Ctx) error {
queryBaseAttributes := []slog.Attr{}
reqHeaders := c.GetReqHeaders()
requestId, err := checkRequestIdInHttpHeader(reqHeaders)
if err != nil {
return ApiErrorF(c, NewError(err.Error(), ""), queryBaseAttributes)
}
slogfiber.AddCustomAttributes(c, slog.String(RequestIdKey, requestId))
queryBaseAttributes = append(queryBaseAttributes, slog.String(RequestIdKey, requestId))
taskId, err := checkTaskIdInHttpHeader(reqHeaders)
if err != nil {
return ApiErrorF(c, NewError(err.Error(), ""), queryBaseAttributes)
}
slogfiber.AddCustomAttributes(c, slog.String(TaskIdKey, taskId))
queryBaseAttributes = append(queryBaseAttributes, slog.String(TaskIdKey, taskId))
// var taskId string
// params := c.AllParams()
// if v, ok := params["taskId"]; ok {
// taskId = v
// } else {
// return ApiErrorF(c, NewError(NoTaskIdKeyFoundInHeader, ""), queryBaseAttributes)
// }
newAttributes := append(queryBaseAttributes, slog.String("c.Path()", c.Path()), slog.String("c.Route().Path", c.Route().Path))
slog.LogAttrs(c.UserContext(), slog.LevelInfo, "http api query task", newAttributes...)
if res, ok := getAsyncRes(taskId); ok {
newAttributes := append(queryBaseAttributes, slog.Int("resAsPb.TaskResultCode", int(res.res.TaskResultCode)))
newAttributes = append(newAttributes, slog.String("TaskResultHeader", string(res.res.TaskResultHeader)))
slog.LogAttrs(c.UserContext(), slog.LevelInfo, "getAsyncRes", newAttributes...)
if isBase64(res.res.TaskResultBody) {
newAttributes = append(queryBaseAttributes, slog.Int("TaskResultBody as Base64", len(res.res.TaskResultBody)))
slog.LogAttrs(c.UserContext(), slog.LevelInfo, "getAsyncRes TaskResultBody", newAttributes...)
} else {
newAttributes = append(queryBaseAttributes, slog.Any("TaskResultBody is not Base64", string(res.res.TaskResultBody)))
slog.LogAttrs(c.UserContext(), slog.LevelInfo, "getAsyncRes TaskResultBody", newAttributes...)
}
return syncOrAsyncReturn(c, res.res, taskId, requestId, newAttributes)
}
return ApiErrorF(c, NewError(fmt.Sprintf(QueryTaskByIdError, taskId), requestId), newAttributes)
})
task := model.TaskType{} task := model.TaskType{}
if err := json.Unmarshal([]byte(taskJsonStr), &task); err != nil { if err := json.Unmarshal([]byte(taskJsonStr), &task); err != nil {
...@@ -1048,34 +1050,59 @@ var replanceQueryTask *model.TaskType ...@@ -1048,34 +1050,59 @@ var replanceQueryTask *model.TaskType
func ApiOrJWT(c *fiber.Ctx) error { func ApiOrJWT(c *fiber.Ctx) error {
ApiOrJWTBaseAttributes := []slog.Attr{} // cfg := slogfiber.Config{
// WithUserAgent: true,
// WithRequestID: true,
// WithRequestBody: true,
// WithRequestHeader: true,
// WithResponseBody: false,
// WithResponseHeader: true,
// WithSpanID: true,
// WithTraceID: true,
// }
//slogfiber.NewWithConfig(slog.Default(), cfg)
reqHeaders := c.GetReqHeaders() reqHeaders := c.GetReqHeaders()
requestId, err := checkRequestIdInHttpHeader(reqHeaders) // for k, v := range reqHeaders {
if err != nil { // fmt.Println("k", k, "v", v)
return ApiErrorF(c, NewError(err.Error(), ""), ApiOrJWTBaseAttributes) // }
}
slogfiber.AddCustomAttributes(c, slog.String(RequestIdKey, requestId)) if taskHeaders, ok := reqHeaders[TaskIdAtrr]; ok {
ApiOrJWTBaseAttributes = append(ApiOrJWTBaseAttributes, slog.String(RequestIdKey, requestId)) if taskHeaders == nil || len(taskHeaders) == 0 {
taskId, err := checkTaskIdInHttpHeader(reqHeaders) baseAttributes := []slog.Attr{}
if err != nil { baseAttributes = append(baseAttributes, slog.String(TaskIdAtrr, "can not get "+TaskIdAtrr))
return ApiErrorF(c, NewError(err.Error(), requestId), ApiOrJWTBaseAttributes)
return ApiErrorF(c, fmt.Sprintf(fmt.Sprintf("Please provide http header %s value, right now the value is %v.", TaskIdAtrr, taskHeaders)), baseAttributes)
}
} else {
baseAttributes := []slog.Attr{}
baseAttributes = append(baseAttributes, slog.String(TaskIdAtrr, "can not get "+TaskIdAtrr))
return ApiErrorF(c, fmt.Sprintf("Please provide http header %s .", TaskIdAtrr), baseAttributes)
} }
slogfiber.AddCustomAttributes(c, slog.String(TaskIdKey, taskId)) baseAttributes := []slog.Attr{}
ApiOrJWTBaseAttributes = append(ApiOrJWTBaseAttributes, slog.String(TaskIdKey, taskId)) baseAttributes = append(baseAttributes, slog.String(TaskIdAtrr, reqHeaders[TaskIdAtrr][0]))
slogfiber.AddCustomAttributes(c, slog.String(TaskIdAtrr, reqHeaders[TaskIdAtrr][0]))
uidAsInt, err := checkUIDInHttpHeader(reqHeaders) if RequestIdHeaders, ok := reqHeaders[RequestId]; ok {
if err != nil { if RequestIdHeaders == nil || len(RequestIdHeaders) == 0 {
return ApiErrorF(c, NewError(err.Error(), requestId), ApiOrJWTBaseAttributes) return ApiErrorF(c, fmt.Sprintf("Please provide http header %s value, right now the value is %v.", RequestId, RequestIdHeaders), nil)
}
} else {
return ApiErrorF(c, fmt.Sprintf("Please provide http header %s .", RequestId), baseAttributes)
} }
slogfiber.AddCustomAttributes(c, slog.Int(UIDkey, uidAsInt))
ApiOrJWTBaseAttributes = append(ApiOrJWTBaseAttributes, slog.Int(UIDkey, uidAsInt))
ApiOrJWTBaseAttributes = append(ApiOrJWTBaseAttributes, slog.String("match_path", c.Route().Path)) slogfiber.AddCustomAttributes(c, slog.String(RequestId, reqHeaders[RequestId][0]))
slog.LogAttrs(c.UserContext(), slog.LevelInfo, "new Api or JWT reuqest", ApiOrJWTBaseAttributes...)
//slogfiber.AddCustomAttributes(c, slog.String(TaskIdAtrr, reqHeaders[TaskIdAtrr][0]))
baseAttributes = append(baseAttributes, slog.String(RequestId, reqHeaders[RequestId][0]))
newAttributes := append(baseAttributes, slog.String("match_path", c.Route().Path))
slog.LogAttrs(c.UserContext(), slog.LevelInfo, "new Api or JWT reuqest", newAttributes...)
routePath := c.Route().Path routePath := c.Route().Path
routePathWithoutStar := strings.TrimSuffix(routePath, "/*") routePathWithoutStar := strings.TrimSuffix(routePath, "/*")
...@@ -1083,6 +1110,18 @@ func ApiOrJWT(c *fiber.Ctx) error { ...@@ -1083,6 +1110,18 @@ func ApiOrJWT(c *fiber.Ctx) error {
path := c.Path() path := c.Path()
pathInDB := strings.TrimPrefix(path, routePathWithoutStar) pathInDB := strings.TrimPrefix(path, routePathWithoutStar)
uid := reqHeaders["X-Consumer-Custom-Id"]
if uid == nil {
return ApiErrorF(c, fmt.Sprintf("uid can not be nil"), baseAttributes)
}
if len(uid) == 0 {
return ApiErrorF(c, fmt.Sprintf("len(uid) can not be 0"), baseAttributes)
}
uidAsInt, err := strconv.Atoi(uid[0])
queryDbAttributes := []slog.Attr{ queryDbAttributes := []slog.Attr{
slog.String("path", pathInDB), slog.String("path", pathInDB),
slog.Int("uid", uidAsInt), slog.Int("uid", uidAsInt),
...@@ -1095,7 +1134,7 @@ func ApiOrJWT(c *fiber.Ctx) error { ...@@ -1095,7 +1134,7 @@ func ApiOrJWT(c *fiber.Ctx) error {
Value: slog.GroupValue(queryDbAttributes...), Value: slog.GroupValue(queryDbAttributes...),
}, },
}, },
ApiOrJWTBaseAttributes..., baseAttributes...,
) )
slog.LogAttrs(c.UserContext(), slog.LevelDebug, "query db", attributes...) slog.LogAttrs(c.UserContext(), slog.LevelDebug, "query db", attributes...)
...@@ -1106,21 +1145,18 @@ func ApiOrJWT(c *fiber.Ctx) error { ...@@ -1106,21 +1145,18 @@ func ApiOrJWT(c *fiber.Ctx) error {
var err error var err error
task, err = cache.Query(pathInDB, int64(uidAsInt)) task, err = cache.Query(pathInDB, int64(uidAsInt))
//QueryDbError
if err != nil { if err != nil {
//QueryDbError return ApiErrorF(c, fmt.Sprintf("cache.Query %v", err.Error()), baseAttributes)
return ApiErrorF(c, NewError(fmt.Sprintf(QueryDbError, err.Error()), requestId), ApiOrJWTBaseAttributes)
} }
} else { } else {
task = replanceQueryTask task = replanceQueryTask
} }
//task.ResultFileExpires //task.ResultFileExpires
//json:"task_result_body,omitempty" //json:"task_result_body,omitempty"
reqHeaders["ResultFileExpiresDB"] = []string{fmt.Sprintf("%d", task.ResultFileExpires)} reqHeaders["ResultFileExpiresDB"] = []string{fmt.Sprintf("%d", task.ResultFileExpires)}
reqHeaders["EstimateExecTime"] = []string{fmt.Sprintf("%d", task.EstimateExecTime)}
reqHeaders["MaxExecTime"] = []string{fmt.Sprintf("%d", task.MaxExecTime)}
reqRelayAsJson, err := json.Marshal(struct { reqRelayAsJson, err := json.Marshal(struct {
Headers map[string][]string `json:"headers"` Headers map[string][]string `json:"headers"`
...@@ -1132,24 +1168,19 @@ func ApiOrJWT(c *fiber.Ctx) error { ...@@ -1132,24 +1168,19 @@ func ApiOrJWT(c *fiber.Ctx) error {
Body: c.Body(), Body: c.Body(),
}) })
if err != nil {
//QueryDbError
return ApiErrorF(c, NewError(fmt.Sprintf(JsonMarshalDbContentError, err.Error()), requestId), ApiOrJWTBaseAttributes)
}
// c.Protocol() // c.Protocol()
// map[string][]string // map[string][]string
// map[string]string // map[string]string
pbMsg := pbUpstream.TaskContent{ pbMsg := pbUpstream.TaskContent{
TaskId: taskId, TaskId: reqHeaders["Task-Id"][0],
TaskType: uint64(task.ID), TaskType: uint64(task.ID),
TaskKind: pbUpstream.TaskKind(task.Kind), TaskKind: pbUpstream.TaskKind(task.Kind),
TaskCmd: task.Cmd, TaskCmd: task.Cmd,
TaskParam: reqRelayAsJson, //[]byte(reqHeaders["Task-Id"][0]), TaskParam: reqRelayAsJson, //[]byte(reqHeaders["Task-Id"][0]),
TaskTimestamp: uint64(time.Now().UnixNano()), TaskTimestamp: uint64(time.Now().UnixNano()),
TaskCallback: "http://" + callbackAddr + "/callback/v1", //"http://192.168.1.10:6001/callback/v1", TaskCallback: "http://" + callbackAddr + "/callback/v1", //"http://192.168.1.10:6001/callback/v1",
TaskUid: fmt.Sprintf("%d", uidAsInt), TaskUid: reqHeaders["X-Consumer-Custom-Id"][0],
TaskFee: fmt.Sprintf("%d", task.Price), TaskFee: fmt.Sprintf("%d", task.Price),
TaskInLen: int32(len(c.Body())), TaskInLen: int32(len(c.Body())),
TaskWorkload: task.Workload, TaskWorkload: task.Workload,
...@@ -1161,7 +1192,7 @@ func ApiOrJWT(c *fiber.Ctx) error { ...@@ -1161,7 +1192,7 @@ func ApiOrJWT(c *fiber.Ctx) error {
slog.Uint64("TaskType", pbMsg.TaskType), slog.Uint64("TaskType", pbMsg.TaskType),
slog.Int("TaskKind", int(pbMsg.TaskKind)), slog.Int("TaskKind", int(pbMsg.TaskKind)),
slog.String("TaskCmd", pbMsg.TaskCmd), slog.String("TaskCmd", pbMsg.TaskCmd),
//slog.String("TaskParam", string(pbMsg.TaskParam)), slog.String("TaskParam", string(pbMsg.TaskParam)),
slog.String("TaskTimestamp", string(pbMsg.TaskTimestamp)), slog.String("TaskTimestamp", string(pbMsg.TaskTimestamp)),
slog.String("TaskCallback", pbMsg.TaskCallback), slog.String("TaskCallback", pbMsg.TaskCallback),
slog.String("TaskUid", pbMsg.TaskUid), slog.String("TaskUid", pbMsg.TaskUid),
...@@ -1178,19 +1209,30 @@ func ApiOrJWT(c *fiber.Ctx) error { ...@@ -1178,19 +1209,30 @@ func ApiOrJWT(c *fiber.Ctx) error {
Value: slog.GroupValue(msgToKafkaAttributes...), Value: slog.GroupValue(msgToKafkaAttributes...),
}, },
}, },
ApiOrJWTBaseAttributes..., baseAttributes...,
) )
slog.LogAttrs(c.UserContext(), slog.LevelDebug, "-> kafka producer", kafkaattributes...) slog.LogAttrs(c.UserContext(), slog.LevelDebug, "-> kafka producer", kafkaattributes...)
//slog.Any()
paramAttributes := append(ApiOrJWTBaseAttributes, slog.Any("TaskParam", pbMsg.TaskParam)) // slog.Debug("msg to kafka",
slog.LogAttrs(c.UserContext(), slog.LevelDebug, "-> kafka producer", paramAttributes...) // "TaskId", pbMsg.TaskId,
// "TaskType", pbMsg.TaskType,
// "TaskKind", pbMsg.TaskKind,
// "TaskCmd", pbMsg.TaskCmd,
// "TaskParam", pbMsg.TaskParam,
// "TaskTimestamp", pbMsg.TaskTimestamp,
// "TaskCallback", pbMsg.TaskCallback,
// "TaskUid", pbMsg.TaskUid,
// "TaskFee", pbMsg.TaskFee,
// "TaskInLen", pbMsg.TaskInLen,
// "TaskWorkload", pbMsg.TaskWorkload,
// "ContainerPubkey", pbMsg.ContainerPubkey)
pbBytes, err := gogoPbProto.Marshal(&pbMsg) pbBytes, err := gogoPbProto.Marshal(&pbMsg)
if err != nil { if err != nil {
return ApiErrorF(c, NewError(fmt.Sprintf(MsgToKafakaPbMarshalError, err.Error()), requestId), ApiOrJWTBaseAttributes) slog.LogAttrs(context.Background(), slog.LevelError, "ApiOrJWT", append([]slog.Attr{}, slog.String("pbMarshal", err.Error()))...)
return ApiErrorF(c, fmt.Sprintf("pb error: %v", err.Error()), baseAttributes)
} }
producerMessagesBytes <- bytesAndHeader{ producerMessagesBytes <- bytesAndHeader{
...@@ -1217,7 +1259,10 @@ func ApiOrJWT(c *fiber.Ctx) error { ...@@ -1217,7 +1259,10 @@ func ApiOrJWT(c *fiber.Ctx) error {
} }
} }
asyncModeAttributes := append(ApiOrJWTBaseAttributes, slog.Bool("async", asyncMode)) //slog.Info("client", "taskid", pbMsg.TaskId, "asyncMode", asyncMode)
asyncModeAttributes := append(baseAttributes, slog.Bool("async", asyncMode))
slog.LogAttrs(c.UserContext(), slog.LevelInfo, "client request mode", asyncModeAttributes...) slog.LogAttrs(c.UserContext(), slog.LevelInfo, "client request mode", asyncModeAttributes...)
if asyncMode { if asyncMode {
...@@ -1228,21 +1273,18 @@ func ApiOrJWT(c *fiber.Ctx) error { ...@@ -1228,21 +1273,18 @@ func ApiOrJWT(c *fiber.Ctx) error {
asyncReq(pbMsg.TaskId) asyncReq(pbMsg.TaskId)
res := SyncResponse{ res := SyncResponse{
TaskId: pbMsg.TaskId, TaskId: pbMsg.TaskId,
RequestId: requestId,
EstimateExecTime: task.EstimateExecTime,
MaxExecTime: task.MaxExecTime,
} }
resAsJson, err := json.Marshal(res) resAsJson, err := json.Marshal(res)
if err != nil { if err != nil {
//SyncResponseJsonMarshalError newAttributes := append(baseAttributes, slog.String("err", err.Error()))
return ApiErrorF(c, NewError(fmt.Sprintf(SyncResponseJsonMarshalError, err.Error()), requestId), ApiOrJWTBaseAttributes) slog.LogAttrs(c.UserContext(), slog.LevelError, "json.Marshal(res)", newAttributes...)
} }
if err := c.Send(resAsJson); err != nil { if err := c.Send(resAsJson); err != nil {
newAttributes := append(ApiOrJWTBaseAttributes, slog.String("err", err.Error())) newAttributes := append(baseAttributes, slog.String("err", err.Error()))
slog.LogAttrs(c.UserContext(), slog.LevelError, "c.JSON(pbMsg.TaskId);", newAttributes...) slog.LogAttrs(c.UserContext(), slog.LevelError, "c.JSON(pbMsg.TaskId);", newAttributes...)
return err return err
} }
...@@ -1250,43 +1292,45 @@ func ApiOrJWT(c *fiber.Ctx) error { ...@@ -1250,43 +1292,45 @@ func ApiOrJWT(c *fiber.Ctx) error {
return nil return nil
} else { } else {
return syncModeF(c, pbMsg.TaskId, requestId, ApiOrJWTBaseAttributes) return syncModeF(c, pbMsg.TaskId)
} }
} }
type SyncResponse struct {
TaskId string `json:"task_id"`
}
func isBase64(s []byte) bool { func isBase64(s []byte) bool {
_, err := base64.StdEncoding.DecodeString(string(s)) _, err := base64.StdEncoding.DecodeString(string(s))
return err == nil return err == nil
} }
func syncModeF(c *fiber.Ctx, taskid string, requestId string, ApiOrJWTBaseAttributes []slog.Attr) error { func syncModeF(c *fiber.Ctx, taskid string) error {
wait := syncReq(taskid) wait := syncReq(taskid)
resAsPb := <-wait resAsPb := <-wait
if resAsPb.err != nil { baseAttributes := []slog.Attr{}
return ApiErrorF(c, NewError(fmt.Sprintf(SynCallBackError, resAsPb.err.Error()), requestId), ApiOrJWTBaseAttributes) baseAttributes = append(baseAttributes, slog.String(TaskIdAtrr, taskid))
}
// newAttributes := append(baseAttributes, slog.Int("resAsPb.TaskResultCode", int(resAsPb.TaskResultCode)), slog.String("TaskResultHeader", string(resAsPb.TaskResultHeader)), slog.String("TaskResultBody", string(resAsPb.TaskResultBody))) // newAttributes := append(baseAttributes, slog.Int("resAsPb.TaskResultCode", int(resAsPb.TaskResultCode)), slog.String("TaskResultHeader", string(resAsPb.TaskResultHeader)), slog.String("TaskResultBody", string(resAsPb.TaskResultBody)))
newAttributes := append(ApiOrJWTBaseAttributes, slog.Int("resAsPb.TaskResultCode", int(resAsPb.res.TaskResultCode))) newAttributes := append(baseAttributes, slog.Int("resAsPb.TaskResultCode", int(resAsPb.TaskResultCode)))
newAttributes = append(newAttributes, slog.String("TaskResultHeader", string(resAsPb.res.TaskResultHeader))) newAttributes = append(newAttributes, slog.String("TaskResultHeader", string(resAsPb.TaskResultHeader)))
if isBase64(resAsPb.res.TaskResultBody) { if isBase64(resAsPb.TaskResultBody) {
newAttributes = append(newAttributes, slog.Int("TaskResultBody as Base64", len(resAsPb.res.TaskResultBody))) newAttributes = append(newAttributes, slog.Int("TaskResultBody as Base64", len(resAsPb.TaskResultBody)))
} else { } else {
newAttributes = append(newAttributes, slog.String("TaskResultBody", string(resAsPb.res.TaskResultBody))) newAttributes = append(newAttributes, slog.String("TaskResultBody", string(resAsPb.TaskResultBody)))
} }
slog.LogAttrs(c.UserContext(), slog.LevelInfo, "syncModeF", newAttributes...) slog.LogAttrs(c.UserContext(), slog.LevelInfo, "syncModeF", newAttributes...)
return syncOrAsyncReturn(c, resAsPb.res, taskid, requestId, ApiOrJWTBaseAttributes) return syncOrAsyncReturn(c, resAsPb, taskid, baseAttributes)
} }
func syncOrAsyncReturn(c *fiber.Ctx, resAsPb pbUpstream.TaskResponse, taskId, requestId string, ApiOrJWTBaseAttributes []slog.Attr) error { func syncOrAsyncReturn(c *fiber.Ctx, resAsPb pbUpstream.TaskResponse, reqTaskId string, baseAttributes []slog.Attr) error {
// 303 (See Other) responses always lead to the use of a GET method. // 303 (See Other) responses always lead to the use of a GET method.
// 307 (Temporary Redirect) and 308 (Permanent Redirect) don't change the method used in the original request. // 307 (Temporary Redirect) and 308 (Permanent Redirect) don't change the method used in the original request.
// 301 (Moved Permanently) and 302 (Found) don't change the method most of the time, though older user-agents may (so you basically don't know). // 301 (Moved Permanently) and 302 (Found) don't change the method most of the time, though older user-agents may (so you basically don't know).
...@@ -1302,7 +1346,8 @@ func syncOrAsyncReturn(c *fiber.Ctx, resAsPb pbUpstream.TaskResponse, taskId, re ...@@ -1302,7 +1346,8 @@ func syncOrAsyncReturn(c *fiber.Ctx, resAsPb pbUpstream.TaskResponse, taskId, re
headers := make(map[string][]string) headers := make(map[string][]string)
if err := json.Unmarshal(resAsPb.TaskResultHeader, &headers); err != nil { if err := json.Unmarshal(resAsPb.TaskResultHeader, &headers); err != nil {
return ApiErrorF(c, NewError(fmt.Sprintf(SyncOrAsyncReturnJsonUnmarshalError, err.Error()), requestId), ApiOrJWTBaseAttributes) slog.LogAttrs(context.Background(), slog.LevelError, "syncOrAsyncReturn", append([]slog.Attr{}, slog.String("reqTaskId", reqTaskId), slog.String(TaskIdAtrr, resAsPb.TaskId), slog.String("json.Unmarshal", err.Error()))...)
return ApiErrorF(c, fmt.Sprintf("json.Unmarshal(resAsPb.TaskResultHeader error: %v", err.Error()), baseAttributes)
} }
for k, vs := range headers { for k, vs := range headers {
...@@ -1310,7 +1355,7 @@ func syncOrAsyncReturn(c *fiber.Ctx, resAsPb pbUpstream.TaskResponse, taskId, re ...@@ -1310,7 +1355,7 @@ func syncOrAsyncReturn(c *fiber.Ctx, resAsPb pbUpstream.TaskResponse, taskId, re
if k == "Content-Encoding" { if k == "Content-Encoding" {
newAttributes := append(ApiOrJWTBaseAttributes, newAttributes := append(baseAttributes,
slog.String(k, v), slog.String(k, v),
) )
...@@ -1320,7 +1365,7 @@ func syncOrAsyncReturn(c *fiber.Ctx, resAsPb pbUpstream.TaskResponse, taskId, re ...@@ -1320,7 +1365,7 @@ func syncOrAsyncReturn(c *fiber.Ctx, resAsPb pbUpstream.TaskResponse, taskId, re
} }
if k == "Content-Type" { if k == "Content-Type" {
newAttributes := append(ApiOrJWTBaseAttributes, newAttributes := append(baseAttributes,
slog.String(k, v), slog.String(k, v),
) )
...@@ -1332,7 +1377,7 @@ func syncOrAsyncReturn(c *fiber.Ctx, resAsPb pbUpstream.TaskResponse, taskId, re ...@@ -1332,7 +1377,7 @@ func syncOrAsyncReturn(c *fiber.Ctx, resAsPb pbUpstream.TaskResponse, taskId, re
if redirectCode { if redirectCode {
if k == "Location" { if k == "Location" {
newAttributes := append(ApiOrJWTBaseAttributes, newAttributes := append(baseAttributes,
slog.String(k, v), slog.String(k, v),
slog.Int("resAsPb.TaskResultCode", int(resAsPb.TaskResultCode)), slog.Int("resAsPb.TaskResultCode", int(resAsPb.TaskResultCode)),
) )
...@@ -1341,6 +1386,10 @@ func syncOrAsyncReturn(c *fiber.Ctx, resAsPb pbUpstream.TaskResponse, taskId, re ...@@ -1341,6 +1386,10 @@ func syncOrAsyncReturn(c *fiber.Ctx, resAsPb pbUpstream.TaskResponse, taskId, re
c.Redirect(v, int(resAsPb.TaskResultCode)) c.Redirect(v, int(resAsPb.TaskResultCode))
//c.Response().Header.SetL
//c.Response().Header.Set
//c.Response().Header.SetContentType(v)
//c.Set("Location", v)
} }
} }
} }
...@@ -1352,11 +1401,11 @@ func syncOrAsyncReturn(c *fiber.Ctx, resAsPb pbUpstream.TaskResponse, taskId, re ...@@ -1352,11 +1401,11 @@ func syncOrAsyncReturn(c *fiber.Ctx, resAsPb pbUpstream.TaskResponse, taskId, re
// return redirectReturn(c, resAsPb) // return redirectReturn(c, resAsPb)
// } // }
return Return(c, resAsPb, requestId, ApiOrJWTBaseAttributes) return Return(c, resAsPb, baseAttributes)
} }
func Return(c *fiber.Ctx, resAsPb pbUpstream.TaskResponse, requestId string, baseAttributes []slog.Attr) error { func Return(c *fiber.Ctx, resAsPb pbUpstream.TaskResponse, baseAttributes []slog.Attr) error {
resAsJson := ResponseJson{ resAsJson := ResponseJson{
Task: ResponseTask{ Task: ResponseTask{
...@@ -1379,7 +1428,9 @@ func Return(c *fiber.Ctx, resAsPb pbUpstream.TaskResponse, requestId string, bas ...@@ -1379,7 +1428,9 @@ func Return(c *fiber.Ctx, resAsPb pbUpstream.TaskResponse, requestId string, bas
//fileAsJsonEscapeHTML,err := EncodeJsonEscapeHTML(fileAsJsonBytes) //fileAsJsonEscapeHTML,err := EncodeJsonEscapeHTML(fileAsJsonBytes)
if err := encoder.Encode(resAsJson); err != nil { if err := encoder.Encode(resAsJson); err != nil {
return ApiErrorF(c, NewError(fmt.Sprintf(SyncOrAsyncReturnJsonUnmarshalError, err.Error()), requestId), baseAttributes) newAttributes := append(baseAttributes, slog.String("err", err.Error()))
slog.LogAttrs(c.UserContext(), slog.LevelError, "encoder.Encode(resAsJson)", newAttributes...)
return err
} }
newAttributes := append(baseAttributes, newAttributes := append(baseAttributes,
...@@ -1412,49 +1463,32 @@ func Return(c *fiber.Ctx, resAsPb pbUpstream.TaskResponse, requestId string, bas ...@@ -1412,49 +1463,32 @@ func Return(c *fiber.Ctx, resAsPb pbUpstream.TaskResponse, requestId string, bas
//return c.JSON(resAsJson) //return c.JSON(resAsJson)
} }
func ApiErrorF(c *fiber.Ctx, msg ApiError, baseAttributes []slog.Attr) error { type ResponseJson struct {
Task ResponseTask `json:"task"`
resAsJson := ResponseJson{ Output json.RawMessage `json:"output"`
Task: ResponseTask{ }
IsSuccess: false,
ApiError: msg,
},
}
resbuffer := &bytes.Buffer{}
encoder := json.NewEncoder(resbuffer)
encoder.SetEscapeHTML(false)
//fileAsJsonEscapeHTML,err := EncodeJsonEscapeHTML(fileAsJsonBytes)
if err := encoder.Encode(resAsJson); err != nil {
newAttributes := append(baseAttributes, slog.String("err", err.Error()))
slog.LogAttrs(c.UserContext(), slog.LevelError, "encoder.Encode(resAsJson)", newAttributes...)
return err
}
newAttributes := append(baseAttributes,
slog.String("TaskId", resAsJson.Task.TaskId),
slog.String("TaskUid", resAsJson.Task.TaskUid),
slog.String("TaskFee", resAsJson.Task.TaskFee),
slog.Bool("IsSuccess", resAsJson.Task.IsSuccess),
slog.String("TaskError", resAsJson.Task.TaskError),
slog.String("ExecCode", resAsJson.Task.ExecCode),
slog.String("ExecError", resAsJson.Task.ExecError),
slog.String("ApiError", resAsJson.Task.ApiError.Msg),
slog.String("ApiError request id", resAsJson.Task.ApiError.RequestId))
slog.LogAttrs(c.UserContext(), slog.LevelInfo, "http return api error", newAttributes...)
if err := c.Status(fiber.StatusOK).JSON(resAsJson); err != nil { type ResponseTask struct {
newAttributes := append(baseAttributes, slog.String("err", err.Error())) TaskId string `json:"task_id"`
slog.LogAttrs(c.UserContext(), slog.LevelError, "c.Status(fiber.StatusOK).JSON(resAsJson)", newAttributes...) //TaskResult []byte `json:"task_result"`
return err TaskUid string `json:"task_uid"`
} TaskFee string `json:"task_fee"`
IsSuccess bool `json:"is_success"`
TaskError string `json:"task_error"`
ExecCode string `json:"exec_code"`
//ExecCode int32 `json:"exec_code"`
// "exec_code":"",
ExecError string `json:"exec_error"`
//Api ApiError `json:"api"`
ApiError string `json:"api_error"`
}
return nil type ApiError struct {
Code int `json:"code"`
Msg string `json:"msg"`
} }
func ApiErrorFByLogger(c *fiber.Ctx, msg ApiError, baseAttributes []slog.Attr) error {
func ApiErrorF(c *fiber.Ctx, msg string, baseAttributes []slog.Attr) error {
resAsJson := ResponseJson{ resAsJson := ResponseJson{
Task: ResponseTask{ Task: ResponseTask{
...@@ -1483,8 +1517,7 @@ func ApiErrorFByLogger(c *fiber.Ctx, msg ApiError, baseAttributes []slog.Attr) e ...@@ -1483,8 +1517,7 @@ func ApiErrorFByLogger(c *fiber.Ctx, msg ApiError, baseAttributes []slog.Attr) e
slog.String("TaskError", resAsJson.Task.TaskError), slog.String("TaskError", resAsJson.Task.TaskError),
slog.String("ExecCode", resAsJson.Task.ExecCode), slog.String("ExecCode", resAsJson.Task.ExecCode),
slog.String("ExecError", resAsJson.Task.ExecError), slog.String("ExecError", resAsJson.Task.ExecError),
slog.String("ApiError", resAsJson.Task.ApiError.Msg), slog.String("ApiError", resAsJson.Task.ApiError))
slog.String("ApiError request id", resAsJson.Task.ApiError.RequestId))
slog.LogAttrs(c.UserContext(), slog.LevelInfo, "http return api error", newAttributes...) slog.LogAttrs(c.UserContext(), slog.LevelInfo, "http return api error", newAttributes...)
......
odysseus-protocol @ b244b62f
Subproject commit b244b62f56e09c656f20a54d379bdde6f5b7ef3d
service-registry @ da2187b2
Subproject commit da2187b223a251b7e6049e0ee3a7951bdca76bee
package main
import (
"encoding/json"
"errors"
"strconv"
)
// const TaskIdAtrr = "Task-Id"
// const RequestId = "X-Kong-Request-Id"
// const UIDkey = "X-Consumer-Custom-Id"
const TaskIdKey = "Task-Id"
const RequestIdKey = "X-Kong-Request-Id"
const UIDkey = "X-Consumer-Custom-Id"
func checkRequestIdInHttpHeader(reqHeaders map[string][]string) (string, error) {
if RequestIdHeaders, ok := reqHeaders[RequestIdKey]; ok {
if RequestIdHeaders == nil || len(RequestIdHeaders) == 0 {
return "", errors.New(NoRequestIdKeyFoundInHeader)
} else {
return RequestIdHeaders[0], nil
}
} else {
return "", errors.New(NoRequestIdValueFoundInHeader)
}
}
func checkTaskIdInHttpHeader(reqHeaders map[string][]string) (string, error) {
if RequestIdHeaders, ok := reqHeaders[TaskIdKey]; ok {
if RequestIdHeaders == nil || len(RequestIdHeaders) == 0 {
return "", errors.New(NoTaskIdKeyFoundInHeader)
} else {
return RequestIdHeaders[0], nil
}
} else {
return "", errors.New(NoTaskIdValueFoundInHeader)
}
}
func checkUIDInHttpHeader(reqHeaders map[string][]string) (int, error) {
if RequestIdHeaders, ok := reqHeaders[UIDkey]; ok {
if RequestIdHeaders == nil || len(RequestIdHeaders) == 0 {
return 0, errors.New(NoTaskIdKeyFoundInHeader)
} else {
uidAsInt, err := strconv.Atoi(RequestIdHeaders[0])
if err != nil {
return 0, errors.New(UidValueStrconvAtoiError + err.Error())
}
return uidAsInt, nil
}
} else {
return 0, errors.New(NoUidValueFoundInHeader)
}
}
type ErrorMsg string
/*
- "No + TaskIdKey + key found in request header"
- "No + RequestIdKey + " key found in request header"
- "No + TaskIdKey + value found in request header"
- "No + RequestIdKey + " value found in request header"
- "No + UIDkey + key found in request header"
- "No + UIDkey + value found in request header"
- "Uid value strconv atoi error is: \%s"
- "Query db error is: \%s"
- "Json marshal db content error is: \%s"
- "Can not find out the task id %s in result cache.""
- "Msg to kafka pb marshal error is: \%s"
- "sync call back error is: \%s"
- "sync response json marshal error is: \%s"
- "Callback proto unmarshal http body error is: \%s"
- "callback errror is: %s"
- "Http header in callback body marshal error is: %s"
- "Sync Or Async Return Json Unmarshal Error is: %s"
- "Task Return Json Marshal Error is: %s"
*/
const (
//http headers
NoTaskIdKeyFoundInHeader = "No " + TaskIdKey + " key found in request header"
NoRequestIdKeyFoundInHeader = "No " + RequestIdKey + " key found in request header"
NoTaskIdValueFoundInHeader = "No " + TaskIdKey + " value found in request header"
NoRequestIdValueFoundInHeader = "No " + RequestIdKey + " value found in request header"
NoUidKeyFoundInHeader = "No " + UIDkey + " key found in request header"
NoUidValueFoundInHeader = "No " + UIDkey + " value found in request header"
UidValueStrconvAtoiError = "Uid value strconv atoi error is: %s"
QueryDbError = "Query db error is: %s"
JsonMarshalDbContentError = "Json marshal db content error is: %s"
QueryTaskByIdError = "Can not find out the task id %s in result cache."
MsgToKafakaPbMarshalError = "Msg to kafka pb marshal error is: %s"
//call back
SynCallBackError = "Sync call back error is: %s"
SyncResponseJsonMarshalError = "Sync response json marshal error is: %s"
//callback
CallbackPbUnmarshalError = "Callback proto unmarshal http body error is: %s"
CallbackError = "Callback errror is: %s"
HttpHeaderInCallbackBodyMarshalError = "Http header in callback body marshal error is: %s"
//return ApiErrorF(c, fmt.Sprintf("cache.Query %v", err.Error()), baseAttributes)
//sync or async return
SyncOrAsyncReturnJsonUnmarshalError = "Sync Or Async Return Json Unmarshal Error is %s"
//return
TaskReturnJsonMarshalError = "Task Return Json Marshal Error is %s"
)
type ResponseJson struct {
Task ResponseTask `json:"task"`
Output json.RawMessage `json:"output"`
}
type ResponseTask struct {
TaskId string `json:"task_id"`
//TaskResult []byte `json:"task_result"`
TaskUid string `json:"task_uid"`
TaskFee string `json:"task_fee"`
IsSuccess bool `json:"is_success"`
TaskError string `json:"task_error"`
ExecCode string `json:"exec_code"`
//ExecCode int32 `json:"exec_code"`
// "exec_code":"",
ExecError string `json:"exec_error"`
//Api ApiError `json:"api"`
ApiError ApiError `json:"api_error"`
}
type ApiError struct {
RequestId string `json:"request_id"`
Msg string `json:"message"`
}
func NewError(msg, requestId string) ApiError {
return ApiError{
RequestId: requestId,
Msg: msg,
}
}
type SyncResponse struct {
TaskId string `json:"task_id"`
RequestId string `json:"request_id"`
EstimateExecTime int `json:"estimate_exec_time"`
MaxExecTime int `json:"max_exec_time"`
}
// {
// "message": "No API key found in request",
// "request_id": "fc567d7a9bd52a5f157d820627266c57"
// }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment