Commit 9550b929 authored by duanjinfei's avatar duanjinfei

parse container resp data

parents 17648480 91df0ae1
...@@ -73,6 +73,7 @@ func inputNodeManagerChan(manager *NodeManager, nodeManagerClient *models.NodeMa ...@@ -73,6 +73,7 @@ func inputNodeManagerChan(manager *NodeManager, nodeManagerClient *models.NodeMa
IsSelected: isSelect, IsSelected: isSelect,
LastHeartTime: time.Now(), LastHeartTime: time.Now(),
} }
usedNodeManagerClient = append(usedNodeManagerClient, nodeManagerClient)
} }
serviceClient := operate.ConnNmGrpc(manager.Info.Endpoint) serviceClient := operate.ConnNmGrpc(manager.Info.Endpoint)
if serviceClient == nil { if serviceClient == nil {
...@@ -82,7 +83,6 @@ func inputNodeManagerChan(manager *NodeManager, nodeManagerClient *models.NodeMa ...@@ -82,7 +83,6 @@ func inputNodeManagerChan(manager *NodeManager, nodeManagerClient *models.NodeMa
nodeManagerClient.Status = true nodeManagerClient.Status = true
nodeManagerClient.Client = serviceClient nodeManagerClient.Client = serviceClient
nodeManagerClientChan <- nodeManagerClient nodeManagerClientChan <- nodeManagerClient
usedNodeManagerClient = append(usedNodeManagerClient, nodeManagerClient)
manager.IsUsed = true manager.IsUsed = true
return true return true
} }
...@@ -2,7 +2,6 @@ package nm ...@@ -2,7 +2,6 @@ package nm
import ( import (
"bytes" "bytes"
"encoding/base64"
"encoding/json" "encoding/json"
"example.com/m/conf" "example.com/m/conf"
"example.com/m/log" "example.com/m/log"
...@@ -233,31 +232,69 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) ...@@ -233,31 +232,69 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
} }
log.WithField("isUseFileCache", isUseFileCache).Info("is use file cache") log.WithField("isUseFileCache", isUseFileCache).Info("is use file cache")
if isUseFileCache && readBody != nil { if isUseFileCache && readBody != nil {
containerResp := &models.ModelResponse{} respBody := make([]byte, 0)
//log.WithField("task resp", readBody).Info("received response") base64ImageStr := ""
err = json.Unmarshal(readBody, &containerResp) data := parseData(readBody)
if err != nil { if data != nil {
log.WithError(err).Error("Error unmarshalling oss resp body failed") switch v := data.(type) {
return case [][]string:
} {
if len(containerResp.Output) == 1 { res := data.([][]string)
if utils.IsBase64ImageStr(containerResp.Output[0]) { if len(res) == 1 {
containerRespOutput := strings.SplitN(containerResp.Output[0], ",", 2) if len(res[0]) == 1 {
imageStr := containerRespOutput[1] base64ImageStr = res[0][0]
queryString := utils.MatchFileCacheQueryString(taskParam.Headers, taskCmd.ImageName, t.DockerOp.ModelsInfo, containerRespOutput[0]) }
//ossUri, err := t.uploadOSS(taskMsg.TaskId, queryString, containerResp.Output[0]) }
ossUri, err := t.uploadOSS(taskMsg.TaskId, queryString, imageStr) for _, innerSlice := range res {
if err != nil { for _, str := range innerSlice {
log.WithError(err).Error("upload image into file cache failed") respBody = append(respBody, []byte(str)...)
return }
}
log.Info("data is [][]string type")
}
case []string:
{
res := data.([]string)
if len(res) == 1 {
base64ImageStr = res[0]
}
respBody = []byte(strings.Join(res, ""))
log.Info("data is []string type")
} }
log.WithField("uri", ossUri).Info("upload image OSS successful") case string:
if ossUri != "" { {
taskExecResult.TaskHttpStatusCode = models.RedirectCode res := data.(string)
post.Header.Set("Location", ossUri) base64ImageStr = res
respBody = []byte(res)
log.Info("data is string type")
} }
default:
log.Error("data is unknown type", v)
}
if base64ImageStr != "" {
isBase64, decodeByte, respFormat, suffix := utils.IsBase64ImageStr(base64ImageStr)
if isBase64 {
log.WithField("taskId", taskMsg.TaskId).WithField("format", respFormat).WithField("suffix", suffix).Info("Parse container resp")
queryString := utils.MatchFileCacheQueryString(taskParam.Headers, taskCmd.ImageName, t.DockerOp.ModelsInfo, respFormat)
ossUri, err := t.uploadOSS(taskMsg.TaskId, queryString, decodeByte, suffix)
if err != nil {
log.WithError(err).Error("upload image into file cache failed")
return
}
log.WithField("uri", ossUri).Info("upload image OSS successful")
if ossUri != "" {
taskExecResult.TaskHttpStatusCode = models.RedirectCode
post.Header.Set("Location", ossUri)
}
} else {
taskExecResult.TaskRespBody = respBody
}
} else {
taskExecResult.TaskRespBody = respBody
} }
} }
} else {
taskExecResult.TaskRespBody = readBody
} }
} }
headers, err := json.Marshal(post.Header) headers, err := json.Marshal(post.Header)
...@@ -336,23 +373,15 @@ func (t *TaskHandler) foundImageIsRunning(imageId string) (bool, string, uint16) ...@@ -336,23 +373,15 @@ func (t *TaskHandler) foundImageIsRunning(imageId string) (bool, string, uint16)
return false, "", 0 return false, "", 0
} }
func (t *TaskHandler) uploadOSS(taskId string, queries string, base64Image string) (string, error) { func (t *TaskHandler) uploadOSS(taskId string, queries string, decodedImage []byte, suffix string) (string, error) {
// todo: 解析结果
// TODO: 存储OSS
var requestBody bytes.Buffer var requestBody bytes.Buffer
writer := multipart.NewWriter(&requestBody) writer := multipart.NewWriter(&requestBody)
// 创建文件表单字段 // 创建文件表单字段
fileField, err := writer.CreateFormFile("file", fmt.Sprintf("%s.png", taskId)) fileField, err := writer.CreateFormFile("file", fmt.Sprintf("%s.%s", taskId, suffix))
if err != nil { if err != nil {
log.WithError(err).Error("Error creating form file") log.WithError(err).Error("Error creating form file")
return "", err return "", err
} }
// 将 base64 解码后的内容复制到表单字段
decodedImage, err := base64.StdEncoding.DecodeString(base64Image)
if err != nil {
log.WithError(err).Error("Error decoding base64 image")
return "", err
}
_, err = io.Copy(fileField, bytes.NewReader(decodedImage)) _, err = io.Copy(fileField, bytes.NewReader(decodedImage))
//_, err = io.Copy(fileField, strings.NewReader(base64Image)) //_, err = io.Copy(fileField, strings.NewReader(base64Image))
if err != nil { if err != nil {
...@@ -383,3 +412,29 @@ func (t *TaskHandler) uploadOSS(taskId string, queries string, base64Image strin ...@@ -383,3 +412,29 @@ func (t *TaskHandler) uploadOSS(taskId string, queries string, base64Image strin
} }
return bytes.NewBuffer(ossRespBody).String(), nil return bytes.NewBuffer(ossRespBody).String(), nil
} }
func parseData(readBody []byte) interface{} {
var m map[string]json.RawMessage
if err := json.Unmarshal(readBody, &m); err != nil {
log.WithError(err).Error("Parse json raw message failed")
return bytes.NewBuffer(readBody).String()
}
var outputTwoArray [][]string
if err := json.Unmarshal(m["output"], &outputTwoArray); err != nil {
log.WithField("err", err).Warn("parse two array output filed failed:")
var outputOneArray []string
if err := json.Unmarshal(m["output"], &outputOneArray); err != nil {
log.WithField("err", err).Warn("parse one array output filed failed:")
var outputString string
if err := json.Unmarshal(m["output"], &outputString); err != nil {
return nil
} else {
return outputString
}
} else {
return outputOneArray
}
} else {
return outputTwoArray
}
}
package test package test
import ( import (
"encoding/json"
"fmt" "fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"testing" "testing"
) )
func Test_initConfig(t *testing.T) { func Test_initConfig(t *testing.T) {
prvKey, _ := crypto.HexToECDSA("e3b097b0c171e2489973a277b1546392db97e359505cd64b9b52966cb87a0f08") //prvKey, _ := crypto.HexToECDSA("e3b097b0c171e2489973a277b1546392db97e359505cd64b9b52966cb87a0f08")
fmt.Println("prvKey:", prvKey) //fmt.Println("prvKey:", prvKey)
pubKey := common.Bytes2Hex(crypto.FromECDSAPub(&prvKey.PublicKey)) //pubKey := common.Bytes2Hex(crypto.FromECDSAPub(&prvKey.PublicKey))
fmt.Println("pubKey:", pubKey) //fmt.Println("pubKey:", pubKey)
address := crypto.PubkeyToAddress(prvKey.PublicKey) //address := crypto.PubkeyToAddress(prvKey.PublicKey)
fmt.Println("address:", address) //fmt.Println("address:", address)
// JSON 2 数据
jsonData := `{
"completed_at": "2023-07-02T02:13:48.764861Z",
"output": ["ss","sss"]
}`
// 解析 JSON 数据到 map[string]json.RawMessage
var m map[string]json.RawMessage
if err := json.Unmarshal([]byte(jsonData), &m); err != nil {
fmt.Println("解析 JSON 数据时出错:", err)
return
}
// 解析 "output" 字段
//var output [][]string
//if err := json.Unmarshal(m["output"], &output); err != nil {
// fmt.Println("解析 output 字段时出错:", err)
// return
//}
var output []string
if err := json.Unmarshal(m["output"], &output); err != nil {
fmt.Println("解析 output 字段时出错:", err)
return
}
// 输出结果
fmt.Println("Output Type:", output)
} }
...@@ -96,14 +96,28 @@ func readPrivateKey() (*ecdsa.PrivateKey, error) { ...@@ -96,14 +96,28 @@ func readPrivateKey() (*ecdsa.PrivateKey, error) {
} }
// IsBase64ImageStr 检查字符串是否是 Base64 编码的图像数据 // IsBase64ImageStr 检查字符串是否是 Base64 编码的图像数据
func IsBase64ImageStr(imageStr string) bool { func IsBase64ImageStr(imageStr string) (bool, []byte, string, string) {
// 移除可能的前缀(如 "data:image/png;base64,") // 移除可能的前缀(如 "data:image/png;base64,")
imageStr = strings.SplitN(imageStr, ",", 2)[1] imageStrArr := strings.SplitN(imageStr, ",", 2)
_, err := base64.StdEncoding.DecodeString(imageStr) base64CodePrefix := ""
return err == nil base64Code := ""
if len(imageStrArr) == 1 {
base64Code = imageStrArr[0]
base64CodePrefix = "data:image/png;base64"
} else {
base64CodePrefix = imageStrArr[0]
base64Code = imageStrArr[1]
}
decodeBytes, err := base64.StdEncoding.DecodeString(base64Code)
if err != nil {
return false, nil, "", ""
}
formatStr := strings.Split(strings.Split(base64CodePrefix, ";")[0], ":")[1]
suffix := strings.Split(formatStr, "/")[1]
return true, decodeBytes, formatStr, suffix
} }
func MatchFileCacheQueryString(params map[string][]string, taskImageName string, modelsInfo []*models.ModelInfo, formatType string) string { func MatchFileCacheQueryString(params map[string][]string, taskImageName string, modelsInfo []*models.ModelInfo, contentType string) string {
values := url.Values{} values := url.Values{}
isExistFileExpires := false isExistFileExpires := false
for key, value := range params { for key, value := range params {
...@@ -126,7 +140,6 @@ func MatchFileCacheQueryString(params map[string][]string, taskImageName string, ...@@ -126,7 +140,6 @@ func MatchFileCacheQueryString(params map[string][]string, taskImageName string,
if !isModelExistFileExpires { if !isModelExistFileExpires {
values.Add(models.ResultFileExpiresDB, "600") values.Add(models.ResultFileExpiresDB, "600")
} }
contentType := strings.Split(strings.Split(formatType, ";")[0], ":")[1]
values.Add(models.ContentType, contentType) values.Add(models.ContentType, contentType)
return values.Encode() return values.Encode()
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment