Commit 467ec439 authored by duanjinfei's avatar duanjinfei

parse container resp data

parent a3c752ba
...@@ -233,20 +233,53 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) ...@@ -233,20 +233,53 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
} }
log.WithField("isUseFileCache", isUseFileCache).Info("is use file cache") log.WithField("isUseFileCache", isUseFileCache).Info("is use file cache")
if isUseFileCache && readBody != nil { if isUseFileCache && readBody != nil {
containerResp := &models.ModelResponse{} respBody := make([]byte, 0)
//log.WithField("task resp", readBody).Info("received response") base64ImageStr := ""
err = json.Unmarshal(readBody, &containerResp) data := parseData(readBody)
if err != nil { if data != nil {
log.WithError(err).Error("Error unmarshalling oss resp body failed") switch v := data.(type) {
return case [][]string:
} {
if len(containerResp.Output) == 1 { res := data.([][]string)
if utils.IsBase64ImageStr(containerResp.Output[0]) { if len(res) == 1 {
containerRespOutput := strings.SplitN(containerResp.Output[0], ",", 2) if len(res[0]) == 1 {
base64ImageStr = res[0][0]
}
}
for _, innerSlice := range res {
for _, str := range innerSlice {
respBody = append(respBody, []byte(str)...)
}
}
log.Info("data is [][]string type")
}
case []string:
{
res := data.([]string)
if len(res) == 1 {
base64ImageStr = res[0]
}
respBody = []byte(strings.Join(res, ""))
log.Info("data is []string type")
}
case string:
{
res := data.(string)
if len(res) == 1 {
base64ImageStr = res
}
respBody = []byte(res)
log.Info("data is string type")
}
default:
log.Error("data is unknown type", v)
}
if base64ImageStr != "" && utils.IsBase64ImageStr(base64ImageStr) {
containerRespOutput := strings.SplitN(base64ImageStr, ",", 2)
imageStr := containerRespOutput[1] imageStr := containerRespOutput[1]
queryString := utils.MatchFileCacheQueryString(taskParam.Headers, taskCmd.ImageName, t.DockerOp.ModelsInfo, containerRespOutput[0]) queryString := utils.MatchFileCacheQueryString(taskParam.Headers, taskCmd.ImageName, t.DockerOp.ModelsInfo, containerRespOutput[0])
//ossUri, err := t.uploadOSS(taskMsg.TaskId, queryString, containerResp.Output[0]) prefix := strings.Split(strings.Split(containerRespOutput[0], ";")[0], ":")[1]
ossUri, err := t.uploadOSS(taskMsg.TaskId, queryString, imageStr) ossUri, err := t.uploadOSS(taskMsg.TaskId, queryString, imageStr, prefix)
if err != nil { if err != nil {
log.WithError(err).Error("upload image into file cache failed") log.WithError(err).Error("upload image into file cache failed")
return return
...@@ -256,8 +289,12 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) ...@@ -256,8 +289,12 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
taskExecResult.TaskHttpStatusCode = models.RedirectCode taskExecResult.TaskHttpStatusCode = models.RedirectCode
post.Header.Set("Location", ossUri) post.Header.Set("Location", ossUri)
} }
} else {
taskExecResult.TaskRespBody = respBody
} }
} }
} else {
taskExecResult.TaskRespBody = readBody
} }
} }
headers, err := json.Marshal(post.Header) headers, err := json.Marshal(post.Header)
...@@ -336,13 +373,11 @@ func (t *TaskHandler) foundImageIsRunning(imageId string) (bool, string, uint16) ...@@ -336,13 +373,11 @@ func (t *TaskHandler) foundImageIsRunning(imageId string) (bool, string, uint16)
return false, "", 0 return false, "", 0
} }
func (t *TaskHandler) uploadOSS(taskId string, queries string, base64Image string) (string, error) { func (t *TaskHandler) uploadOSS(taskId string, queries string, base64Image, prefix string) (string, error) {
// todo: 解析结果
// TODO: 存储OSS
var requestBody bytes.Buffer var requestBody bytes.Buffer
writer := multipart.NewWriter(&requestBody) writer := multipart.NewWriter(&requestBody)
// 创建文件表单字段 // 创建文件表单字段
fileField, err := writer.CreateFormFile("file", fmt.Sprintf("%s.png", taskId)) fileField, err := writer.CreateFormFile("file", fmt.Sprintf("%s.%s", taskId, prefix))
if err != nil { if err != nil {
log.WithError(err).Error("Error creating form file") log.WithError(err).Error("Error creating form file")
return "", err return "", err
...@@ -383,3 +418,29 @@ func (t *TaskHandler) uploadOSS(taskId string, queries string, base64Image strin ...@@ -383,3 +418,29 @@ func (t *TaskHandler) uploadOSS(taskId string, queries string, base64Image strin
} }
return bytes.NewBuffer(ossRespBody).String(), nil return bytes.NewBuffer(ossRespBody).String(), nil
} }
func parseData(readBody []byte) interface{} {
var m map[string]json.RawMessage
if err := json.Unmarshal(readBody, &m); err != nil {
log.WithError(err).Error("Parse json raw message failed")
return nil
}
var outputTwoArray [][]string
if err := json.Unmarshal(m["output"], &outputTwoArray); err != nil {
log.WithField("err", err).Warn("parse two array output filed failed:")
var outputOneArray []string
if err := json.Unmarshal(m["output"], &outputOneArray); err != nil {
log.WithField("err", err).Warn("parse one array output filed failed:")
var outputString string
if err := json.Unmarshal(m["output"], &outputString); err != nil {
return nil
} else {
return outputString
}
} else {
return outputOneArray
}
} else {
return outputTwoArray
}
}
package test package test
import ( import (
"encoding/json"
"fmt" "fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"testing" "testing"
) )
func Test_initConfig(t *testing.T) { func Test_initConfig(t *testing.T) {
prvKey, _ := crypto.HexToECDSA("e3b097b0c171e2489973a277b1546392db97e359505cd64b9b52966cb87a0f08") //prvKey, _ := crypto.HexToECDSA("e3b097b0c171e2489973a277b1546392db97e359505cd64b9b52966cb87a0f08")
fmt.Println("prvKey:", prvKey) //fmt.Println("prvKey:", prvKey)
pubKey := common.Bytes2Hex(crypto.FromECDSAPub(&prvKey.PublicKey)) //pubKey := common.Bytes2Hex(crypto.FromECDSAPub(&prvKey.PublicKey))
fmt.Println("pubKey:", pubKey) //fmt.Println("pubKey:", pubKey)
address := crypto.PubkeyToAddress(prvKey.PublicKey) //address := crypto.PubkeyToAddress(prvKey.PublicKey)
fmt.Println("address:", address) //fmt.Println("address:", address)
// JSON 2 数据
jsonData := `{
"completed_at": "2023-07-02T02:13:48.764861Z",
"output": ["ss","sss"]
}`
// 解析 JSON 数据到 map[string]json.RawMessage
var m map[string]json.RawMessage
if err := json.Unmarshal([]byte(jsonData), &m); err != nil {
fmt.Println("解析 JSON 数据时出错:", err)
return
}
// 解析 "output" 字段
//var output [][]string
//if err := json.Unmarshal(m["output"], &output); err != nil {
// fmt.Println("解析 output 字段时出错:", err)
// return
//}
var output []string
if err := json.Unmarshal(m["output"], &output); err != nil {
fmt.Println("解析 output 字段时出错:", err)
return
}
// 输出结果
fmt.Println("Output Type:", output)
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment