Commit 027b53f4 authored by Cloud User's avatar Cloud User

api error

parent 17290af8
...@@ -598,24 +598,6 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent ...@@ -598,24 +598,6 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
} }
} }
type ResponseJson struct {
Task ResponseTask `json:"task"`
Output json.RawMessage `json:"output"`
}
type ResponseTask struct {
TaskId string `json:"task_id"`
//TaskResult []byte `json:"task_result"`
TaskUid string `json:"task_uid"`
TaskFee string `json:"task_fee"`
IsSuccess bool `json:"is_success"`
TaskError string `json:"task_error"`
ExecCode string `json:"exec_code"`
//ExecCode int32 `json:"exec_code"`
// "exec_code":"",
ExecError string `json:"exec_error"`
}
func newCache(redisAddr, redisPass, mysqlIP, dbName, user, passwd string, port int) *cachedata.CacheData { func newCache(redisAddr, redisPass, mysqlIP, dbName, user, passwd string, port int) *cachedata.CacheData {
_cache := cachedata.NewCacheData(context.Background(), cachedata.RedisConnParam{ _cache := cachedata.NewCacheData(context.Background(), cachedata.RedisConnParam{
...@@ -959,10 +941,13 @@ func main() { ...@@ -959,10 +941,13 @@ func main() {
if RequestIdHeaders, ok := reqHeaders[RequestId]; ok { if RequestIdHeaders, ok := reqHeaders[RequestId]; ok {
if RequestIdHeaders == nil || len(RequestIdHeaders) == 0 { if RequestIdHeaders == nil || len(RequestIdHeaders) == 0 {
return c.SendString(fmt.Sprintf("Please provide http header %s value, right now the value is %v.", RequestId, RequestIdHeaders)) return ApiErrorF(c, fmt.Sprintf("Please provide http header %s value, right now the value is %v.", RequestId, RequestIdHeaders), baseAttributes)
} }
} else { } else {
return c.SendString(fmt.Sprintf("Please provide http header %s .", RequestId))
return ApiErrorF(c, fmt.Sprintf("Please provide http header %s .", RequestId), attributes)
//return c.SendString(fmt.Sprintf("Please provide http header %s .", RequestId))
} }
slogfiber.AddCustomAttributes(c, slog.String(RequestId, reqHeaders[RequestId][0])) slogfiber.AddCustomAttributes(c, slog.String(RequestId, reqHeaders[RequestId][0]))
...@@ -975,7 +960,8 @@ func main() { ...@@ -975,7 +960,8 @@ func main() {
slog.LogAttrs(c.UserContext(), slog.LevelInfo, "http api query task", newAttributes...) slog.LogAttrs(c.UserContext(), slog.LevelInfo, "http api query task", newAttributes...)
if len(taskId) == 0 { if len(taskId) == 0 {
return c.SendString(fmt.Sprintf("%s must provide task id param for route %s ; e.g. http://127.0.0.1/query/v1/e45b5ebc-c71e-4ab8-b10f-d1202e7fb16e ", c.Path(), c.Route().Path)) return ApiErrorF(c, fmt.Sprintf("%s must provide task id param for route %s ; e.g. http://127.0.0.1/query/v1/e45b5ebc-c71e-4ab8-b10f-d1202e7fb16e ", c.Path(), c.Route().Path), attributes)
//return c.SendString(fmt.Sprintf("%s must provide task id param for route %s ; e.g. http://127.0.0.1/query/v1/e45b5ebc-c71e-4ab8-b10f-d1202e7fb16e ", c.Path(), c.Route().Path))
} }
if res, ok := getAsyncRes(taskId); ok { if res, ok := getAsyncRes(taskId); ok {
...@@ -999,7 +985,9 @@ func main() { ...@@ -999,7 +985,9 @@ func main() {
return syncOrAsyncReturn(c, res, taskId, baseAttributes) return syncOrAsyncReturn(c, res, taskId, baseAttributes)
} }
return c.SendString(fmt.Sprintf("can not find out the task id %s in result cache.", taskId)) return ApiErrorF(c, fmt.Sprintf("can not find out the task id %s in result cache.", taskId), attributes)
//return c.SendString(fmt.Sprintf("can not find out the task id %s in result cache.", taskId))
}) })
cfg := slogfiber.Config{ cfg := slogfiber.Config{
...@@ -1028,7 +1016,8 @@ func main() { ...@@ -1028,7 +1016,8 @@ func main() {
var resbody pbUpstream.TaskResponse var resbody pbUpstream.TaskResponse
if err := gogoPbProto.Unmarshal(body, &resbody); err != nil { if err := gogoPbProto.Unmarshal(body, &resbody); err != nil {
return c.SendString(fmt.Sprintf("callback Unmarshal error %v", err.Error())) return ApiErrorF(c, fmt.Sprintf("callback Unmarshal error %v", err.Error()), attributes)
//return c.SendString(fmt.Sprintf("callback Unmarshal error %v", err.Error()))
} }
slogfiber.AddCustomAttributes(c, slog.String(TaskIdAtrr, resbody.TaskId)) slogfiber.AddCustomAttributes(c, slog.String(TaskIdAtrr, resbody.TaskId))
...@@ -1081,28 +1070,32 @@ func ApiOrJWT(c *fiber.Ctx) error { ...@@ -1081,28 +1070,32 @@ func ApiOrJWT(c *fiber.Ctx) error {
if taskHeaders, ok := reqHeaders[TaskIdAtrr]; ok { if taskHeaders, ok := reqHeaders[TaskIdAtrr]; ok {
if taskHeaders == nil || len(taskHeaders) == 0 { if taskHeaders == nil || len(taskHeaders) == 0 {
return c.SendString(fmt.Sprintf("Please provide http header %s value, right now the value is %v.", TaskIdAtrr, taskHeaders)) return ApiErrorF(c, fmt.Sprintf(fmt.Sprintf("Please provide http header %s value, right now the value is %v.", TaskIdAtrr, taskHeaders)), nil)
//return c.SendString(fmt.Sprintf("Please provide http header %s value, right now the value is %v.", TaskIdAtrr, taskHeaders))
} }
} else { } else {
return c.SendString(fmt.Sprintf("Please provide http header %s .", TaskIdAtrr)) return ApiErrorF(c, fmt.Sprintf("Please provide http header %s .", TaskIdAtrr), nil)
//return c.SendString(fmt.Sprintf("Please provide http header %s .", TaskIdAtrr))
} }
baseAttributes := []slog.Attr{}
baseAttributes = append(baseAttributes, slog.String(TaskIdAtrr, reqHeaders[TaskIdAtrr][0]))
slogfiber.AddCustomAttributes(c, slog.String(TaskIdAtrr, reqHeaders[TaskIdAtrr][0])) slogfiber.AddCustomAttributes(c, slog.String(TaskIdAtrr, reqHeaders[TaskIdAtrr][0]))
if RequestIdHeaders, ok := reqHeaders[RequestId]; ok { if RequestIdHeaders, ok := reqHeaders[RequestId]; ok {
if RequestIdHeaders == nil || len(RequestIdHeaders) == 0 { if RequestIdHeaders == nil || len(RequestIdHeaders) == 0 {
return c.SendString(fmt.Sprintf("Please provide http header %s value, right now the value is %v.", RequestId, RequestIdHeaders)) return ApiErrorF(c, fmt.Sprintf("Please provide http header %s value, right now the value is %v.", RequestId, RequestIdHeaders), nil)
//return c.SendString(fmt.Sprintf("Please provide http header %s value, right now the value is %v.", RequestId, RequestIdHeaders))
} }
} else { } else {
return c.SendString(fmt.Sprintf("Please provide http header %s .", RequestId)) return ApiErrorF(c, fmt.Sprintf("Please provide http header %s .", RequestId), baseAttributes)
//return c.SendString(fmt.Sprintf("Please provide http header %s .", RequestId))
} }
slogfiber.AddCustomAttributes(c, slog.String(RequestId, reqHeaders[RequestId][0])) slogfiber.AddCustomAttributes(c, slog.String(RequestId, reqHeaders[RequestId][0]))
//slogfiber.AddCustomAttributes(c, slog.String(TaskIdAtrr, reqHeaders[TaskIdAtrr][0])) //slogfiber.AddCustomAttributes(c, slog.String(TaskIdAtrr, reqHeaders[TaskIdAtrr][0]))
baseAttributes := []slog.Attr{}
baseAttributes = append(baseAttributes, slog.String(TaskIdAtrr, reqHeaders[TaskIdAtrr][0]))
baseAttributes = append(baseAttributes, slog.String(RequestId, reqHeaders[RequestId][0])) baseAttributes = append(baseAttributes, slog.String(RequestId, reqHeaders[RequestId][0]))
newAttributes := append(baseAttributes, slog.String("match_path", c.Route().Path)) newAttributes := append(baseAttributes, slog.String("match_path", c.Route().Path))
...@@ -1120,11 +1113,12 @@ func ApiOrJWT(c *fiber.Ctx) error { ...@@ -1120,11 +1113,12 @@ func ApiOrJWT(c *fiber.Ctx) error {
uid := reqHeaders["X-Consumer-Custom-Id"] uid := reqHeaders["X-Consumer-Custom-Id"]
if uid == nil { if uid == nil {
return c.SendString(fmt.Sprintf("uid can not be nil")) return ApiErrorF(c, fmt.Sprintf("uid can not be nil"), baseAttributes)
//return c.SendString(fmt.Sprintf("uid can not be nil"))
} }
if len(uid) == 0 { if len(uid) == 0 {
return c.SendString(fmt.Sprintf("len(uid) can not be 0")) return ApiErrorF(c, fmt.Sprintf("len(uid) can not be 0"), baseAttributes)
} }
uidAsInt, err := strconv.Atoi(uid[0]) uidAsInt, err := strconv.Atoi(uid[0])
...@@ -1153,7 +1147,8 @@ func ApiOrJWT(c *fiber.Ctx) error { ...@@ -1153,7 +1147,8 @@ func ApiOrJWT(c *fiber.Ctx) error {
task, err = cache.Query(pathInDB, int64(uidAsInt)) task, err = cache.Query(pathInDB, int64(uidAsInt))
if err != nil { if err != nil {
return c.SendString(fmt.Sprintf("cache.Query %v", err.Error())) return ApiErrorF(c, fmt.Sprintf("cache.Query %v", err.Error()), baseAttributes)
//return c.SendString(fmt.Sprintf("cache.Query %v", err.Error()))
} }
} else { } else {
task = replanceQueryTask task = replanceQueryTask
...@@ -1239,7 +1234,8 @@ func ApiOrJWT(c *fiber.Ctx) error { ...@@ -1239,7 +1234,8 @@ func ApiOrJWT(c *fiber.Ctx) error {
if err != nil { if err != nil {
slog.LogAttrs(context.Background(), slog.LevelError, "ApiOrJWT", append([]slog.Attr{}, slog.String("pbMarshal", err.Error()))...) slog.LogAttrs(context.Background(), slog.LevelError, "ApiOrJWT", append([]slog.Attr{}, slog.String("pbMarshal", err.Error()))...)
return c.SendString(fmt.Sprintf("pb error: %v", err.Error())) return ApiErrorF(c, fmt.Sprintf("pb error: %v", err.Error()), baseAttributes)
//return c.SendString(fmt.Sprintf("pb error: %v", err.Error()))
} }
producerMessagesBytes <- bytesAndHeader{ producerMessagesBytes <- bytesAndHeader{
...@@ -1278,7 +1274,9 @@ func ApiOrJWT(c *fiber.Ctx) error { ...@@ -1278,7 +1274,9 @@ func ApiOrJWT(c *fiber.Ctx) error {
//return c.Send(m) //return c.Send(m)
asyncReq(pbMsg.TaskId) asyncReq(pbMsg.TaskId)
return c.SendString(pbMsg.TaskId) return c.JSON(pbMsg.TaskId)
//return c.SendString(pbMsg.TaskId)
} else { } else {
return syncModeF(c, pbMsg.TaskId) return syncModeF(c, pbMsg.TaskId)
...@@ -1333,9 +1331,10 @@ func syncOrAsyncReturn(c *fiber.Ctx, resAsPb pbUpstream.TaskResponse, reqTaskId ...@@ -1333,9 +1331,10 @@ func syncOrAsyncReturn(c *fiber.Ctx, resAsPb pbUpstream.TaskResponse, reqTaskId
if err := json.Unmarshal(resAsPb.TaskResultHeader, &headers); err != nil { if err := json.Unmarshal(resAsPb.TaskResultHeader, &headers); err != nil {
slog.LogAttrs(context.Background(), slog.LevelError, "syncOrAsyncReturn", append([]slog.Attr{}, slog.String("reqTaskId", reqTaskId), slog.String(TaskIdAtrr, resAsPb.TaskId), slog.String("json.Unmarshal", err.Error()))...) slog.LogAttrs(context.Background(), slog.LevelError, "syncOrAsyncReturn", append([]slog.Attr{}, slog.String("reqTaskId", reqTaskId), slog.String(TaskIdAtrr, resAsPb.TaskId), slog.String("json.Unmarshal", err.Error()))...)
return ApiErrorF(c, fmt.Sprintf("json.Unmarshal(resAsPb.TaskResultHeader error: %v", err.Error()), baseAttributes)
//slog.Error("syncModeF", "json.Unmarshal(resAsPb.TaskResultHeader", err.Error()) //slog.Error("syncModeF", "json.Unmarshal(resAsPb.TaskResultHeader", err.Error())
return c.SendString(fmt.Sprintf("json.Unmarshal(resAsPb.TaskResultHeader error: %v", err.Error())) //return c.SendString(fmt.Sprintf("json.Unmarshal(resAsPb.TaskResultHeader error: %v", err.Error()))
} }
for k, vs := range headers { for k, vs := range headers {
...@@ -1439,6 +1438,74 @@ func Return(c *fiber.Ctx, resAsPb pbUpstream.TaskResponse, baseAttributes []slog ...@@ -1439,6 +1438,74 @@ func Return(c *fiber.Ctx, resAsPb pbUpstream.TaskResponse, baseAttributes []slog
slog.LogAttrs(c.UserContext(), slog.LevelInfo, "http return", newAttributes...) slog.LogAttrs(c.UserContext(), slog.LevelInfo, "http return", newAttributes...)
return c.JSON(resAsJson) return c.Status(int(resAsPb.TaskResultCode)).JSON(resAsJson)
//return c.JSON(resAsJson)
}
type ResponseJson struct {
Task ResponseTask `json:"task"`
Output json.RawMessage `json:"output"`
}
type ResponseTask struct {
TaskId string `json:"task_id"`
//TaskResult []byte `json:"task_result"`
TaskUid string `json:"task_uid"`
TaskFee string `json:"task_fee"`
IsSuccess bool `json:"is_success"`
TaskError string `json:"task_error"`
ExecCode string `json:"exec_code"`
//ExecCode int32 `json:"exec_code"`
// "exec_code":"",
ExecError string `json:"exec_error"`
//Api ApiError `json:"api"`
ApiError string `json:"api_error"`
}
type ApiError struct {
Code int `json:"code"`
Msg string `json:"msg"`
}
func ApiErrorF(c *fiber.Ctx, msg string, baseAttributes []slog.Attr) error {
resAsJson := ResponseJson{
Task: ResponseTask{
IsSuccess: false,
ApiError: msg,
},
}
resbuffer := &bytes.Buffer{}
encoder := json.NewEncoder(resbuffer)
encoder.SetEscapeHTML(false)
//fileAsJsonEscapeHTML,err := EncodeJsonEscapeHTML(fileAsJsonBytes)
if err := encoder.Encode(resAsJson); err != nil {
newAttributes := append(baseAttributes, slog.String("err", err.Error()))
slog.LogAttrs(c.UserContext(), slog.LevelError, "encoder.Encode(resAsJson)", newAttributes...)
return err
}
newAttributes := append(baseAttributes,
slog.String("TaskId", resAsJson.Task.TaskId),
slog.String("TaskUid", resAsJson.Task.TaskUid),
slog.String("TaskFee", resAsJson.Task.TaskFee),
slog.Bool("IsSuccess", resAsJson.Task.IsSuccess),
slog.String("TaskError", resAsJson.Task.TaskError),
slog.String("ExecCode", resAsJson.Task.ExecCode),
slog.String("ExecError", resAsJson.Task.ExecError),
slog.String("ApiError", resAsJson.Task.ApiError))
slog.LogAttrs(c.UserContext(), slog.LevelInfo, "http return api error", newAttributes...)
if err := c.Status(fiber.StatusOK).JSON(resAsJson); err != nil {
newAttributes := append(baseAttributes, slog.String("err", err.Error()))
slog.LogAttrs(c.UserContext(), slog.LevelError, "c.Status(fiber.StatusOK).JSON(resAsJson)", newAttributes...)
return err
}
return nil
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment