Commit 5ed39013 authored by Your Name's avatar Your Name

http redirect

parent 0a2a6811
...@@ -9,40 +9,71 @@ import ( ...@@ -9,40 +9,71 @@ import (
type apiQueryTxsForAddr struct { type apiQueryTxsForAddr struct {
uuid string uuid string
res chan pbUpstream.TaskResponse res chan pbUpstream.TaskResponse
async bool
} }
var ApiQueryTxsByAddrForQueue = make(chan apiQueryTxsForAddr, 1000) var ApiQueryTxsByAddrForQueue = make(chan apiQueryTxsForAddr, 1000)
func req(uuid string) chan pbUpstream.TaskResponse { func syncReq(uuid string) chan pbUpstream.TaskResponse {
//ApiQueryTxsByAddrForQueue
res := make(chan pbUpstream.TaskResponse) res := make(chan pbUpstream.TaskResponse)
ApiQueryTxsByAddrForQueue <- apiQueryTxsForAddr{ ApiQueryTxsByAddrForQueue <- apiQueryTxsForAddr{
uuid: uuid, uuid: uuid,
res: res, res: res,
async: false,
} }
return res return res
}
func asyncReq(uuid string) {
//ApiQueryTxsByAddrForQueue
ApiQueryTxsByAddrForQueue <- apiQueryTxsForAddr{
uuid: uuid,
async: true,
}
} }
func res(res pbUpstream.TaskResponse) { func callbackRes(res pbUpstream.TaskResponse) {
if v, ok := recordmap.LoadAndDelete(res.TaskId); ok { if v, ok := recordmap.LoadAndDelete(res.TaskId); ok {
resAsV, ok := v.(chan pbUpstream.TaskResponse) //resAsV, ok := v.(chan pbUpstream.TaskResponse)
resAsV, ok := v.(apiQueryTxsForAddr)
if ok { if ok {
resAsV <- res if !resAsV.async {
resAsV.res <- res
} else {
asyncmap.Store(res.TaskId, res)
}
} }
} }
} }
func getAsyncRes(uuid string) (pbUpstream.TaskResponse, bool) {
if v, ok := asyncmap.LoadAndDelete(uuid); ok {
if resAsV, ok := v.(pbUpstream.TaskResponse); ok {
return resAsV, true
}
}
return pbUpstream.TaskResponse{}, false
}
var recordmap sync.Map var recordmap sync.Map
var asyncmap sync.Map
func recordUUID() { func recordUUID() {
for message := range ApiQueryTxsByAddrForQueue { for message := range ApiQueryTxsByAddrForQueue {
recordmap.Store(message.uuid, message.res)
//apiQueryTxsForAddr
//recordmap.Store(message.uuid, message.res)
recordmap.Store(message.uuid, message)
} }
} }
......
...@@ -448,6 +448,7 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent ...@@ -448,6 +448,7 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
} else { } else {
break break
} }
} }
// Make sure to close the sender on exit to release resources. // Make sure to close the sender on exit to release resources.
defer sender.Close() defer sender.Close()
...@@ -591,12 +592,12 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent ...@@ -591,12 +592,12 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
} }
type ResponseJson struct { type ResponseJson struct {
TaskId string TaskId string `json:"task_id"`
TaskResult []byte TaskResult []byte `json:"task_result"`
TaskUid string TaskUid string `json:"task_uid"`
TaskFee string TaskFee string `json:"task_fee"`
IsSuccess bool IsSuccess bool `json:"is_success"`
TaskError string TaskError string `json:"task_error"`
} }
func newCache(redisAddr, redisPass, mysqlIP, dbName, user, passwd string, port int) *cachedata.CacheData { func newCache(redisAddr, redisPass, mysqlIP, dbName, user, passwd string, port int) *cachedata.CacheData {
...@@ -906,7 +907,6 @@ func main() { ...@@ -906,7 +907,6 @@ func main() {
apiGroup := app.Group("/api") apiGroup := app.Group("/api")
jwtGroup := app.Group("/jwt") jwtGroup := app.Group("/jwt")
callbackGroup := app.Group("/callback")
apiGroupV1 := apiGroup.Group("/v1") apiGroupV1 := apiGroup.Group("/v1")
apiGroupV1.Post("/*", slogfiber.NewWithConfig(slog.Default(), cfg), ApiOrJWT) apiGroupV1.Post("/*", slogfiber.NewWithConfig(slog.Default(), cfg), ApiOrJWT)
...@@ -914,6 +914,26 @@ func main() { ...@@ -914,6 +914,26 @@ func main() {
jwtGroupV1 := jwtGroup.Group("/v1") jwtGroupV1 := jwtGroup.Group("/v1")
jwtGroupV1.Post("/*", ApiOrJWT) jwtGroupV1.Post("/*", ApiOrJWT)
//curl -X GET http://127.0.0.1:4000/api/jobs?id=e45b5ebc-c71e-4ab8-b10f-d1202e7fb16e
queryGroup := app.Group("/query")
queryGroupV1 := queryGroup.Group("/v1")
queryGroupV1.Post("/:taskId", func(c *fiber.Ctx) error {
taskId := c.Params("taskId")
if len(taskId) == 0 {
return c.SendString(fmt.Sprintf("%s must provide task id param for route %s ; e.g. http://127.0.0.1/query/v1/e45b5ebc-c71e-4ab8-b10f-d1202e7fb16e ", c.Path(), c.Route().Path))
}
if res, ok := getAsyncRes(taskId); ok {
return syncOrAsyncReturn(c, res, taskId)
}
return c.SendString(fmt.Sprintf("can not find out the task id %s in result cache."))
})
callbackGroup := app.Group("/callback")
callbackGroupV1 := callbackGroup.Group("/v1") callbackGroupV1 := callbackGroup.Group("/v1")
callbackGroupV1.Post("/", func(c *fiber.Ctx) error { callbackGroupV1.Post("/", func(c *fiber.Ctx) error {
...@@ -932,7 +952,7 @@ func main() { ...@@ -932,7 +952,7 @@ func main() {
slog.LogAttrs(c.UserContext(), slog.LevelDebug, "callback", append(baseAttributes, slog.Bool("TaskIsSucceed", resbody.TaskIsSucceed), slog.String(TaskIdAtrr, resbody.TaskId), slog.String("TaskUid", resbody.TaskUid))...) slog.LogAttrs(c.UserContext(), slog.LevelDebug, "callback", append(baseAttributes, slog.Bool("TaskIsSucceed", resbody.TaskIsSucceed), slog.String(TaskIdAtrr, resbody.TaskId), slog.String("TaskUid", resbody.TaskUid))...)
res(resbody) callbackRes(resbody)
return c.SendStatus(200) return c.SendStatus(200)
}) })
...@@ -1026,15 +1046,34 @@ func ApiOrJWT(c *fiber.Ctx) error { ...@@ -1026,15 +1046,34 @@ func ApiOrJWT(c *fiber.Ctx) error {
} }
} else { } else {
task = replanceQueryTask task = replanceQueryTask
} }
//task.ResultFileExpires
//json:"task_result_body,omitempty"
reqHeaders["ResultFileExpiresDB"] = []string{fmt.Sprintf("%d", task.ResultFileExpires)}
reqRelayAsJson, err := json.Marshal(struct {
Headers map[string][]string `json:"headers"`
Queries map[string]string `json:"queries"`
Body []byte `json:"body"`
}{
Headers: reqHeaders,
Queries: c.Queries(),
Body: c.Body(),
})
// c.Protocol()
// map[string][]string
// map[string]string
pbMsg := pbUpstream.TaskContent{ pbMsg := pbUpstream.TaskContent{
TaskId: reqHeaders["Task-Id"][0], TaskId: reqHeaders["Task-Id"][0],
TaskType: uint64(task.ID), TaskType: uint64(task.ID),
TaskKind: pbUpstream.TaskKind(task.Kind), TaskKind: pbUpstream.TaskKind(task.Kind),
TaskCmd: task.Cmd, TaskCmd: task.Cmd,
TaskParam: c.Body(), //[]byte(reqHeaders["Task-Id"][0]), TaskParam: reqRelayAsJson, //[]byte(reqHeaders["Task-Id"][0]),
TaskTimestamp: uint64(time.Now().UnixNano()), TaskTimestamp: uint64(time.Now().UnixNano()),
TaskCallback: "http://" + callbackAddr + "/callback/v1", //"http://192.168.1.10:6001/callback/v1", TaskCallback: "http://" + callbackAddr + "/callback/v1", //"http://192.168.1.10:6001/callback/v1",
TaskUid: reqHeaders["X-Consumer-Custom-Id"][0], TaskUid: reqHeaders["X-Consumer-Custom-Id"][0],
...@@ -1049,6 +1088,7 @@ func ApiOrJWT(c *fiber.Ctx) error { ...@@ -1049,6 +1088,7 @@ func ApiOrJWT(c *fiber.Ctx) error {
slog.Uint64("TaskType", pbMsg.TaskType), slog.Uint64("TaskType", pbMsg.TaskType),
slog.Int("TaskType", int(pbMsg.TaskKind)), slog.Int("TaskType", int(pbMsg.TaskKind)),
slog.String("TaskCmd", pbMsg.TaskCmd), slog.String("TaskCmd", pbMsg.TaskCmd),
slog.String("TaskParam", string(pbMsg.TaskParam)),
} }
kafkaattributes := append( kafkaattributes := append(
...@@ -1113,12 +1153,63 @@ func ApiOrJWT(c *fiber.Ctx) error { ...@@ -1113,12 +1153,63 @@ func ApiOrJWT(c *fiber.Ctx) error {
//time.Sleep(10 * time.Second) //time.Sleep(10 * time.Second)
//m := make([]byte, 1024*1024+1024*512) //m := make([]byte, 1024*1024+1024*512)
//return c.Send(m) //return c.Send(m)
asyncReq(pbMsg.TaskId)
return c.SendString(pbMsg.TaskId) return c.SendString(pbMsg.TaskId)
} else { } else {
return syncModeF(c, pbMsg.TaskId) return syncModeF(c, pbMsg.TaskId)
} }
}
//fmt.Println("syncMode-------------", syncMode) func syncModeF(c *fiber.Ctx, taskid string) error {
wait := syncReq(taskid)
resAsPb := <-wait
baseAttributes := []slog.Attr{}
baseAttributes = append(baseAttributes, slog.String(TaskIdAtrr, taskid))
newAttributes := append(baseAttributes, slog.String("TaskResultHeader", string(resAsPb.TaskResultHeader)))
slog.LogAttrs(c.UserContext(), slog.LevelInfo, "new Api or JWT reuqest", newAttributes...)
return syncOrAsyncReturn(c, resAsPb, taskid)
//slog.Debug("resAsPb.TaskResultHeader", "resAsPb.TaskResultHeader", resAsPb.TaskResultHeader)
// if resAsPb.TaskResultHeader != nil {
// if len(resAsPb.TaskResultHeader) != 0 {
// headers := make(map[string][]string)
// if err := json.Unmarshal(resAsPb.TaskResultHeader, &headers); err != nil {
// slog.LogAttrs(context.Background(), slog.LevelError, "syncModeF", append([]slog.Attr{}, slog.String(TaskIdAtrr, taskid), slog.String("json.Unmarshal", err.Error()))...)
// //slog.Error("syncModeF", "json.Unmarshal(resAsPb.TaskResultHeader", err.Error())
// return c.SendString(fmt.Sprintf("json.Unmarshal(resAsPb.TaskResultHeader error: %v", err.Error()))
// }
// for k, vs := range headers {
// for _, v := range vs {
// if k == "Content-Encoding" {
// c.Response().Header.SetContentEncoding(v)
// }
// if k == "Content-Type" {
// c.Response().Header.SetContentType(v)
// }
// }
// }
// }
// }
// if resAsPb.TaskIsSucceed {
// return c.Send(resAsPb.GetTaskResultBody())
// }
// //fmt.Println("syncMode-------------", syncMode)
// resAsJson := ResponseJson{ // resAsJson := ResponseJson{
// TaskId: resAsPb.TaskId, // TaskId: resAsPb.TaskId,
// TaskResult: resAsPb.GetTaskResultBody(), // TaskResult: resAsPb.GetTaskResultBody(),
...@@ -1128,24 +1219,20 @@ func ApiOrJWT(c *fiber.Ctx) error { ...@@ -1128,24 +1219,20 @@ func ApiOrJWT(c *fiber.Ctx) error {
// TaskError: resAsPb.TaskError, // TaskError: resAsPb.TaskError,
// } // }
//return c.Send(resAsPb.GetTaskResultBody()) // return c.JSON(resAsJson)
//return c.JSON(resAsJson)
} }
func syncModeF(c *fiber.Ctx, taskid string) error { func syncOrAsyncReturn(c *fiber.Ctx, resAsPb pbUpstream.TaskResponse, reqTaskId string) error {
wait := req(taskid) // 303 (See Other) responses always lead to the use of a GET method.
resAsPb := <-wait // 307 (Temporary Redirect) and 308 (Permanent Redirect) don't change the method used in the original request.
// 301 (Moved Permanently) and 302 (Found) don't change the method most of the time, though older user-agents may (so you basically don't know).
baseAttributes := []slog.Attr{} redirectCode := false
baseAttributes = append(baseAttributes, slog.String(TaskIdAtrr, taskid)) if resAsPb.TaskResultCode == 301 || resAsPb.TaskResultCode == 302 || resAsPb.TaskResultCode == 303 || resAsPb.TaskResultCode == 307 || resAsPb.TaskResultCode == 308 {
newAttributes := append(baseAttributes, slog.String("TaskResultHeader", string(resAsPb.TaskResultHeader)))
slog.LogAttrs(c.UserContext(), slog.LevelInfo, "new Api or JWT reuqest", newAttributes...)
//slog.Debug("resAsPb.TaskResultHeader", "resAsPb.TaskResultHeader", resAsPb.TaskResultHeader) redirectCode = true
}
if resAsPb.TaskResultHeader != nil { if resAsPb.TaskResultHeader != nil {
if len(resAsPb.TaskResultHeader) != 0 { if len(resAsPb.TaskResultHeader) != 0 {
...@@ -1154,7 +1241,7 @@ func syncModeF(c *fiber.Ctx, taskid string) error { ...@@ -1154,7 +1241,7 @@ func syncModeF(c *fiber.Ctx, taskid string) error {
if err := json.Unmarshal(resAsPb.TaskResultHeader, &headers); err != nil { if err := json.Unmarshal(resAsPb.TaskResultHeader, &headers); err != nil {
slog.LogAttrs(context.Background(), slog.LevelError, "syncModeF", append([]slog.Attr{}, slog.String(TaskIdAtrr, taskid), slog.String("json.Unmarshal", err.Error()))...) slog.LogAttrs(context.Background(), slog.LevelError, "syncOrAsyncReturn", append([]slog.Attr{}, slog.String("reqTaskId", reqTaskId), slog.String(TaskIdAtrr, resAsPb.TaskId), slog.String("json.Unmarshal", err.Error()))...)
//slog.Error("syncModeF", "json.Unmarshal(resAsPb.TaskResultHeader", err.Error()) //slog.Error("syncModeF", "json.Unmarshal(resAsPb.TaskResultHeader", err.Error())
return c.SendString(fmt.Sprintf("json.Unmarshal(resAsPb.TaskResultHeader error: %v", err.Error())) return c.SendString(fmt.Sprintf("json.Unmarshal(resAsPb.TaskResultHeader error: %v", err.Error()))
...@@ -1170,10 +1257,63 @@ func syncModeF(c *fiber.Ctx, taskid string) error { ...@@ -1170,10 +1257,63 @@ func syncModeF(c *fiber.Ctx, taskid string) error {
if k == "Content-Type" { if k == "Content-Type" {
c.Response().Header.SetContentType(v) c.Response().Header.SetContentType(v)
} }
if redirectCode {
if k == "Location" {
c.Response().Header.SetContentType(v)
}
} }
} }
} }
} }
}
if redirectCode {
return redirectReturn(c, resAsPb)
}
return Return(c, resAsPb)
}
func Return(c *fiber.Ctx, resAsPb pbUpstream.TaskResponse) error {
if resAsPb.TaskIsSucceed {
return c.Send(resAsPb.GetTaskResultBody()) return c.Send(resAsPb.GetTaskResultBody())
}
//fmt.Println("syncMode-------------", syncMode)
resAsJson := ResponseJson{
TaskId: resAsPb.TaskId,
TaskResult: resAsPb.GetTaskResultBody(),
TaskUid: resAsPb.TaskUid,
TaskFee: resAsPb.TaskFee,
IsSuccess: resAsPb.TaskIsSucceed,
TaskError: resAsPb.TaskError,
}
return c.JSON(resAsJson)
}
func redirectReturn(c *fiber.Ctx, resAsPb pbUpstream.TaskResponse) error {
if resAsPb.TaskIsSucceed {
return c.Status(int(resAsPb.TaskResultCode)).Send(resAsPb.GetTaskResultBody())
}
//fmt.Println("syncMode-------------", syncMode)
resAsJson := ResponseJson{
TaskId: resAsPb.TaskId,
TaskResult: resAsPb.GetTaskResultBody(),
TaskUid: resAsPb.TaskUid,
TaskFee: resAsPb.TaskFee,
IsSuccess: resAsPb.TaskIsSucceed,
TaskError: resAsPb.TaskError,
}
return c.Status(int(resAsPb.TaskResultCode)).JSON(resAsJson)
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment