Commit 0a0dbbda authored by duanjinfei's avatar duanjinfei

update task handler

parent 559ca15b
......@@ -10,6 +10,8 @@ const (
ContentType = "type"
RedirectCode = 303
UseFileCache = "USE-FILE-CACHE"
Prefer = "Prefer"
Async = "respond-async"
ModelPublishStatusYes = 1
ModelPublishStatusNo = 2
)
......@@ -30,12 +30,17 @@ type TaskParam struct {
Body []byte `json:"body"`
}
type ContainerRequest struct {
WebHook string `json:"webhook"`
}
type TaskResult struct {
TaskHttpHeaders []byte
TaskExecTime int64
TaskHttpStatusCode int32
TaskRespBody []byte
TaskIsSuccess bool
TaskExecError string
}
type ApiResp struct {
......@@ -72,6 +77,12 @@ type ModelInfo struct {
PublishStatus int `json:"publish_status"`
}
type FileCacheResult struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data string `json:"data"`
}
type ComputeResult struct {
Code string `json:"code"`
Msg string `json:"msg"`
......
......@@ -239,7 +239,8 @@ func SubmitResultResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
TaskId: taskId,
ContainerSignature: containerSign,
MinerSignature: minerSign,
TaskResultCode: taskExecResult.TaskHttpStatusCode,
TaskExecuteCode: taskExecResult.TaskHttpStatusCode,
TaskExecuteError: taskExecResult.TaskExecError,
TaskResultHeader: taskExecResult.TaskHttpHeaders,
TaskExecuteDuration: uint64(taskExecResult.TaskExecTime),
IsSuccessed: isSuccess,
......
......@@ -8,6 +8,7 @@ import (
"example.com/m/operate"
"example.com/m/utils"
"example.com/m/validator"
"fmt"
nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"google.golang.org/grpc"
"time"
......@@ -293,6 +294,7 @@ func handlerMsg(nodeManager *models.NodeManagerClient,
TaskHttpHeaders: nil,
TaskIsSuccess: false,
TaskExecTime: 0,
TaskExecError: "",
}
if taskExecResInterface != nil {
taskExecRes = taskExecResInterface.(*models.TaskResult)
......@@ -302,6 +304,7 @@ func handlerMsg(nodeManager *models.NodeManagerClient,
if containerSign == nil || len(containerSign) == 0 {
log.Error("Container signing failed................")
isSuccess = false
taskExecRes.TaskExecError = fmt.Sprintf("%s,%s", taskExecRes.TaskExecError, "Container sign failed")
}
reqHash, respHash, minerSign := taskMsgWorker.GetMinerSign(taskMsg, taskExecRes.TaskRespBody)
params := buildParams(taskMsg.TaskId, containerSign, minerSign, taskExecRes, isSuccess)
......
......@@ -18,6 +18,7 @@ import (
"math/rand"
"mime/multipart"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
......@@ -97,11 +98,13 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
TaskHttpHeaders: nil,
TaskIsSuccess: false,
TaskExecTime: 0,
TaskExecError: "",
}
taskCmd := &models.TaskCmd{}
err := json.Unmarshal(bytes.NewBufferString(taskMsg.TaskCmd).Bytes(), taskCmd)
if err != nil {
log.Errorf("failed to unmarshal task cmd: %s", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "failed to unmarshal task cmd: %s", err.Error())
return
}
log.Info("received task cmd :", taskCmd)
......@@ -138,6 +141,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
images, err := t.DockerOp.PsImages()
if err != nil {
log.Error("Ps images failed:", err)
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Ps images failed:", err.Error())
return
}
imageId := ""
......@@ -157,6 +161,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
}
if !isFound {
log.Error("The image is not found:", taskCmd.ImageName)
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "The image is not found:", taskCmd.ImageName)
return
}
running, internalIp, internalPort := t.foundImageIsRunning(imageId)
......@@ -178,6 +183,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
containerId, err := t.DockerOp.CreateAndStartContainer(taskCmd.ImageName, taskCmd.DockerCmd)
if err != nil {
log.Errorf("Create and start container failed: %s", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Create and start container failed: %s", err.Error())
return
}
log.Infof("Started container with ID %s", containerId)
......@@ -198,6 +204,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
err = json.Unmarshal(taskMsg.TaskParam, taskParam)
if err != nil {
log.WithField("err", err).Error("Error unmarshalling task parameter")
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Error unmarshalling task parameter", err.Error())
return
}
reqContainerBody := bytes.NewReader(taskParam.Body)
......@@ -205,9 +212,44 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
queryString := utils.MatchContainerQueryString(taskParam.Queries)
taskCmd.ApiUrl = fmt.Sprintf("%s?%s", taskCmd.ApiUrl, queryString)
}
post, err := t.HttpClient.Post(taskCmd.ApiUrl, "application/json", reqContainerBody)
request, err := http.NewRequest("POST", taskCmd.ApiUrl, reqContainerBody)
request.Header.Set("Content-Type", "application/json")
for key, value := range taskParam.Headers {
if key == models.Prefer {
if value[0] == models.Async {
request.Header.Set(models.Prefer, models.Async)
m := &models.ContainerRequest{}
err := json.Unmarshal(taskParam.Body, m)
if err != nil {
log.WithError(err).Error("json unmarshal task body failed")
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Json unmarshal task body failed", err.Error())
return
}
if m.WebHook == "" {
log.Error("Request webhook is nil")
taskExecResult.TaskExecError = fmt.Sprintf("%s", "Request webhook is nil")
return
} else {
_, err := url.Parse(m.WebHook)
if err != nil {
log.WithError(err).Error("web hook url parse failed")
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "web hook url parse failed", err.Error())
return
}
}
break
}
}
}
if err != nil {
log.WithField("error:", err).Error("New container request failed")
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Http client post request container failed", err.Error())
return
}
post, err := t.HttpClient.Do(request)
if err != nil {
log.WithField("error:", err).Error("Http client post request container failed")
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Http client post request container failed", err.Error())
return
}
endAfterTaskTime := time.Since(startBeforeTaskTime)
......@@ -217,10 +259,10 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
taskExecResult.TaskHttpStatusCode = http.StatusOK
readBody, err := io.ReadAll(post.Body)
if err != nil {
log.Error("received error: ", err)
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s,Container Http Code:%d", "Read container body failed", err.Error(), post.StatusCode)
log.Error("Read container body failed", err)
return
}
taskExecResult.TaskRespBody = readBody
isUseFileCache := true
if taskMsg.TaskKind != baseV1.TaskKind_StandardTask {
for key, value := range taskParam.Headers {
......@@ -233,86 +275,98 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
}
log.WithField("isUseFileCache", isUseFileCache).Info("is use file cache")
if readBody != nil {
containerRespStrArr := make([]string, 0)
data := parseData(readBody)
if data != nil {
switch v := data.(type) {
case [][]string:
{
res := data.([][]string)
for _, innerSlice := range res {
for _, str := range innerSlice {
containerRespStrArr = append(containerRespStrArr, str)
log.Info("data is [][]string type")
apiRes := make([][]string, 1)
for i, innerSlice := range res {
for j, respStr := range innerSlice {
ossUri, err := t.getFileCache(respStr, taskMsg, taskParam, taskCmd)
if err != nil || ossUri == "" {
apiRes[i][j] = respStr
continue
}
if ossUri != "" && len(res) == 1 && len(innerSlice) == 1 && isUseFileCache {
taskExecResult.TaskHttpStatusCode = models.RedirectCode
post.Header.Set("Location", ossUri)
break
} else {
apiRes[i][j] = ossUri
}
}
}
log.Info("data is [][]string type")
if len(apiRes) > 1 || len(apiRes[0]) > 1 {
apiResBody := utils.EncodeJsonEscapeHTML(apiRes)
taskExecResult.TaskRespBody = apiResBody
}
}
case []string:
{
res := data.([]string)
for _, slice := range res {
containerRespStrArr = append(containerRespStrArr, slice)
}
log.Info("data is []string type")
apiRes := make([]string, 0)
for _, respStr := range res {
ossUri, err := t.getFileCache(respStr, taskMsg, taskParam, taskCmd)
if err != nil || ossUri == "" {
apiRes = append(apiRes, respStr)
continue
}
if ossUri != "" && len(res) == 1 && isUseFileCache {
taskExecResult.TaskHttpStatusCode = models.RedirectCode
post.Header.Set("Location", ossUri)
break
} else {
apiRes = append(apiRes, respStr)
}
}
if len(apiRes) > 1 {
apiResBody := utils.EncodeJsonEscapeHTML(apiRes)
taskExecResult.TaskRespBody = apiResBody
}
}
case string:
{
res := data.(string)
containerRespStrArr = append(containerRespStrArr, res)
resStr := data.(string)
log.Info("data is string type")
}
default:
log.Error("data is unknown type", v)
}
apiRes := make([]string, 0)
for _, respStr := range containerRespStrArr {
isBase64, decodeByte, respFormat, suffix := utils.IsBase64ImageStr(respStr)
if isBase64 && isUseFileCache {
log.WithField("taskId", taskMsg.TaskId).WithField("format", respFormat).WithField("suffix", suffix).Info("Parse container resp")
queryString := utils.MatchFileCacheQueryString(taskParam.Headers, taskCmd.ImageName, t.DockerOp.ModelsInfo, respFormat)
ossUri, err := t.uploadOSS(taskMsg.TaskId, queryString, decodeByte, suffix)
if err != nil {
log.WithError(err).Error("upload image into file cache failed")
apiRes = append(apiRes, respStr)
continue
}
log.WithField("uri", ossUri).Info("upload image OSS successful")
if ossUri != "" && len(containerRespStrArr) == 1 {
taskExecResult.TaskHttpStatusCode = models.RedirectCode
post.Header.Set("Location", ossUri)
resArr := []string{resStr}
apiRes := make([]string, 0)
for _, respStr := range resArr {
ossUri, err := t.getFileCache(respStr, taskMsg, taskParam, taskCmd)
if err != nil || ossUri == "" {
apiRes = append(apiRes, respStr)
continue
}
if ossUri != "" && len(resArr) == 1 && isUseFileCache {
taskExecResult.TaskHttpStatusCode = models.RedirectCode
post.Header.Set("Location", ossUri)
break
} else {
apiRes = append(apiRes, ossUri)
}
}
if ossUri != "" {
apiRes = append(apiRes, ossUri)
if len(apiRes) > 1 {
apiResBody := utils.EncodeJsonEscapeHTML(apiRes)
taskExecResult.TaskRespBody = apiResBody
}
} else {
apiRes = append(apiRes, respStr)
}
}
if len(apiRes) > 1 {
res := &models.ApiResp{
IsSuccess: true,
TaskResult: apiRes,
}
apiResBody := bytes.NewBuffer([]byte{})
encoder := json.NewEncoder(apiResBody)
encoder.SetEscapeHTML(false)
err := encoder.Encode(res)
if err != nil {
log.WithError(err).Error("encoder Encode")
}
//apiResBody, err := json.Marshal(res)
//if err != nil {
// log.WithError(err).Error("json marshal upload oss uri")
//}
log.WithField("apiResBody", string(apiResBody.Bytes())).Info("model resp")
taskExecResult.TaskRespBody = apiResBody.Bytes()
default:
log.Error("data is unknown type", v)
taskExecResult.TaskExecError = "Container resp data is unknown type"
apiRes := make([]string, 0)
apiResBody := utils.EncodeJsonEscapeHTML(apiRes)
taskExecResult.TaskRespBody = apiResBody
}
}
}
}
headers, err := json.Marshal(post.Header)
if err != nil {
log.Error("JSON marshal header error: ", err)
log.WithError(err).Error("JSON marshal container header failed")
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "JSON marshal container header failed", err.Error())
return
}
log.WithField("headers", post.Header).Info("return task http headers")
......@@ -327,10 +381,13 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
log.Error("JSON read error: ", err)
return
}
taskExecResult.TaskRespBody = all
taskExecResult.TaskExecError = fmt.Sprintf("%s,Container Http Code:%d,body:%s", "Read container body failed", post.StatusCode, string(all))
} else {
taskExecResult.TaskRespBody = nil
taskExecResult.TaskExecError = fmt.Sprintf("%s,Container Http Code:%d,body:%s", "Read container body failed", post.StatusCode, "")
}
apiRes := make([]string, 0)
apiResBody := utils.EncodeJsonEscapeHTML(apiRes)
taskExecResult.TaskRespBody = apiResBody
log.WithField("error", post.Body).WithField("taskId", taskMsg.TaskId).Error("Exec task result is failed")
}
if taskMsg.TaskKind == baseV1.TaskKind_ComputeTask {
......@@ -421,7 +478,39 @@ func (t *TaskHandler) uploadOSS(taskId string, queries string, decodedImage []by
log.WithError(err).Error("Error read oss resp body failed")
return "", err
}
return bytes.NewBuffer(ossRespBody).String(), nil
fileCacheRes := &models.FileCacheResult{}
err = json.Unmarshal(ossRespBody, fileCacheRes)
if err != nil {
log.WithError(err).Error("Json unmarshal file cache result failed")
return "", err
}
log.WithField("code", fileCacheRes.Code).WithField("msg", fileCacheRes.Msg).WithField("data", fileCacheRes.Data).Info("file cache result")
if fileCacheRes.Code == http.StatusOK && fileCacheRes.Data != "" {
_, err := url.Parse(fileCacheRes.Data)
if err != nil {
log.WithError(err).Error("url parse file cache data error")
return "", err
}
return fileCacheRes.Data, nil
}
return "", err
}
func (t *TaskHandler) getFileCache(respStr string, taskMsg *nodeManagerV1.PushTaskMessage, taskParam *models.TaskParam, taskCmd *models.TaskCmd) (string, error) {
isBase64, decodeByte, respFormat, suffix := utils.IsBase64ImageStr(respStr)
log.WithField("isBase64", isBase64).WithField("decodeByte", decodeByte).Info("resp str info")
if isBase64 {
log.WithField("taskId", taskMsg.TaskId).WithField("format", respFormat).WithField("suffix", suffix).Info("Parse container resp")
queryString := utils.MatchFileCacheQueryString(taskParam.Headers, taskCmd.ImageName, t.DockerOp.ModelsInfo, respFormat)
ossUri, err := t.uploadOSS(taskMsg.TaskId, queryString, decodeByte, suffix)
if err != nil || ossUri == "" {
log.WithError(err).Error("upload image into file cache failed")
return "", err
}
log.WithField("uri", ossUri).Info("upload image OSS successful")
return ossUri, nil
}
return "", nil
}
func parseData(readBody []byte) interface{} {
......
package utils
import (
"bytes"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"encoding/base64"
"encoding/json"
"example.com/m/log"
"example.com/m/models"
"fmt"
......@@ -110,6 +112,7 @@ func IsBase64ImageStr(imageStr string) (bool, []byte, string, string) {
}
decodeBytes, err := base64.StdEncoding.DecodeString(base64Code)
if err != nil {
log.WithError(err).Error("base64 decode string failed")
return false, nil, "", ""
}
formatStr := strings.Split(strings.Split(base64CodePrefix, ";")[0], ":")[1]
......@@ -151,3 +154,16 @@ func MatchContainerQueryString(params map[string]string) string {
}
return values.Encode()
}
func EncodeJsonEscapeHTML(apiRes any) []byte {
apiResBody := bytes.NewBuffer([]byte{})
encoder := json.NewEncoder(apiResBody)
encoder.SetEscapeHTML(false)
err := encoder.Encode(apiRes)
if err != nil {
log.WithError(err).Error("encoder Encode")
return apiResBody.Bytes()
}
log.WithField("apiResBody", string(apiResBody.Bytes())).Info("model resp")
return apiResBody.Bytes()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment