Commit 1407de9f authored by duanjinfei's avatar duanjinfei

handler container resp

parent 03c1cc46
......@@ -318,7 +318,7 @@ func handlerMsg(nodeManager *models.NodeManagerClient,
}
} else {
isSuccess = false
taskExecRes.TaskExecError = fmt.Sprintf("%s-%s", "Task exec error", taskExecRes.TaskExecError)
taskExecRes.TaskExecError = fmt.Sprintf("worker:%s-%s-%s", conf.GetConfig().SignPublicAddress.Hex(), "Task exec error", taskExecRes.TaskExecError)
}
reqHash, respHash, minerSign := taskMsgWorker.GetMinerSign(taskMsg, taskExecRes.TaskRespBody)
params := buildParams(taskMsg.TaskId, containerSign, minerSign, taskExecRes, isSuccess)
......
......@@ -109,7 +109,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
err := json.Unmarshal(bytes.NewBufferString(taskMsg.TaskCmd).Bytes(), taskCmd)
if err != nil {
log.Errorf("failed to unmarshal task cmd: %s", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "failed to unmarshal task cmd: %s", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "failed to unmarshal task cmd: %s", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
......@@ -123,7 +123,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
log.Info("found task image finished")
if !isFound || imageId == "" {
log.Error("The image is not found:", taskCmd.ImageName)
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "The image is not found:", taskCmd.ImageName)
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "The image is not found:", taskCmd.ImageName)
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
......@@ -146,7 +146,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
containerId, err := t.DockerOp.CreateAndStartContainer(taskCmd.ImageName, taskCmd.DockerCmd)
if err != nil {
log.Errorf("Create and start container failed: %s", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "Create and start container failed", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Create and start container failed", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
......@@ -174,7 +174,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
err = json.Unmarshal(taskMsg.TaskParam, taskParam)
if err != nil {
log.WithField("err", err).Error("Error unmarshalling task parameter")
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "Error unmarshalling task parameter", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Error unmarshalling task parameter", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
......@@ -186,7 +186,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
request, err := http.NewRequest("POST", taskCmd.ApiUrl, reqContainerBody)
if err != nil {
log.WithField("error:", err).Error("New container request failed")
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "Http client new container request failed", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Http client new container request failed", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
......@@ -199,20 +199,20 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
err := json.Unmarshal(taskParam.Body, m)
if err != nil {
log.WithError(err).Error("json unmarshal task body failed")
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "Json unmarshal task body failed", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Json unmarshal task body failed", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
if m.WebHook == "" {
log.Error("Request webhook is nil")
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "Request webhook is nil")
taskExecResult.TaskExecError = fmt.Sprintf("%s", "Request webhook is nil")
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
} else {
_, err := url.Parse(m.WebHook)
if err != nil {
log.WithError(err).Error("web hook url parse failed")
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "Web hook url parse failed", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Web hook url parse failed", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
......@@ -224,7 +224,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
post, err := t.HttpClient.Do(request)
if err != nil {
log.WithField("error:", err).Error("Http client post request container failed")
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "Http client post request container failed", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Http client post request container failed", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
......@@ -235,7 +235,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
taskExecResult.TaskHttpStatusCode = http.StatusOK
readBody, err := io.ReadAll(post.Body)
if err != nil {
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,%s,Container Http Code:%d", conf.GetConfig().SignPublicAddress.Hex(), "Read container body failed", err.Error(), post.StatusCode)
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s,Container Http Code:%d", "Read container body failed", err.Error(), post.StatusCode)
log.Error("Read container body failed", err)
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
......@@ -260,6 +260,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
res := data.([][]string)
log.Info("data is [][]string type")
apiRes := make([][]string, 1)
isRedirect := false
for _, innerSlice := range res {
apiResOneArr := make([]string, 0)
for _, respStr := range innerSlice {
......@@ -270,11 +271,11 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
ossUri, err := t.getFileCache(respStr, taskMsg, taskParam, taskCmd)
if err != nil || ossUri == "" {
if err != nil {
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "Get file cache uri failed", err)
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Get file cache uri failed", err)
}
if ossUri == "" {
apiResOneArr = append(apiResOneArr, respStr)
apiResBody := utils.EncodeJsonEscapeHTML(apiRes)
taskExecResult.TaskRespBody = apiResBody
}
continue
}
if ossUri != "" && len(res) == 1 && len(innerSlice) == 1 {
......@@ -282,6 +283,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
apiResBody := utils.EncodeJsonEscapeHTML(ossUri)
taskExecResult.TaskRespBody = apiResBody
post.Header.Set("Location", ossUri)
isRedirect = true
break
} else {
apiResOneArr = append(apiResOneArr, ossUri)
......@@ -289,7 +291,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
}
apiRes = append(apiRes, apiResOneArr)
}
if len(apiRes) > 1 || len(apiRes[0]) > 1 {
if !isRedirect {
apiResBody := utils.EncodeJsonEscapeHTML(apiRes)
taskExecResult.TaskRespBody = apiResBody
}
......@@ -299,6 +301,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
res := data.([]string)
log.Info("data is []string type")
apiRes := make([]string, 0)
isRedirect := false
for _, respStr := range res {
if !isUseFileCache {
apiRes = append(apiRes, respStr)
......@@ -307,11 +310,11 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
ossUri, err := t.getFileCache(respStr, taskMsg, taskParam, taskCmd)
if err != nil || ossUri == "" {
if err != nil {
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "Get file cache uri failed", err)
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Get file cache uri failed", err)
}
if ossUri == "" {
apiRes = append(apiRes, respStr)
apiResBody := utils.EncodeJsonEscapeHTML(apiRes)
taskExecResult.TaskRespBody = apiResBody
}
continue
}
if ossUri != "" && len(res) == 1 {
......@@ -319,12 +322,13 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
post.Header.Set("Location", ossUri)
apiResBody := utils.EncodeJsonEscapeHTML(ossUri)
taskExecResult.TaskRespBody = apiResBody
isRedirect = true
break
} else {
apiRes = append(apiRes, ossUri)
}
}
if len(apiRes) > 1 {
if !isRedirect {
apiResBody := utils.EncodeJsonEscapeHTML(apiRes)
taskExecResult.TaskRespBody = apiResBody
}
......@@ -335,6 +339,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
log.Info("data is string type")
resArr := []string{resStr}
apiRes := make([]string, 0)
isRedirect := false
for _, respStr := range resArr {
if !isUseFileCache {
apiRes = append(apiRes, respStr)
......@@ -343,11 +348,11 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
ossUri, err := t.getFileCache(respStr, taskMsg, taskParam, taskCmd)
if err != nil || ossUri == "" {
if err != nil {
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "Get file cache uri failed", err)
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Get file cache uri failed", err)
}
if ossUri == "" {
apiRes = append(apiRes, respStr)
apiResBody := utils.EncodeJsonEscapeHTML(apiRes)
taskExecResult.TaskRespBody = apiResBody
}
continue
}
if ossUri != "" && len(resArr) == 1 {
......@@ -355,25 +360,26 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
post.Header.Set("Location", ossUri)
apiResBody := utils.EncodeJsonEscapeHTML(ossUri)
taskExecResult.TaskRespBody = apiResBody
isRedirect = true
break
} else {
apiRes = append(apiRes, ossUri)
}
}
if len(apiRes) > 1 {
if !isRedirect {
apiResBody := utils.EncodeJsonEscapeHTML(apiRes)
taskExecResult.TaskRespBody = apiResBody
}
}
default:
log.Error("data is unknown type", v)
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "Container resp data is unknown type")
taskExecResult.TaskExecError = fmt.Sprintf("%s", "Container resp data is unknown type")
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
} else {
log.Error("Container resp output is nil")
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "Container resp output is nil")
taskExecResult.TaskExecError = fmt.Sprintf("%s", "Container resp output is nil")
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
taskExecResult.TaskRespBody = readBody
return
......@@ -386,7 +392,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
headers, err := json.Marshal(post.Header)
if err != nil {
log.WithError(err).Error("JSON marshal container header failed")
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "JSON marshal container header failed", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "JSON marshal container header failed", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
......@@ -400,17 +406,14 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
all, err := io.ReadAll(post.Body)
if err != nil {
log.Error("JSON read error: ", err)
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,Container Http Code:%d,err:%s", conf.GetConfig().SignPublicAddress.Hex(), "Read container body failed", post.StatusCode, err)
taskExecResult.TaskExecError = fmt.Sprintf("%s,Container Http Code:%d,err:%s", "Read container body failed", post.StatusCode, err)
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,Container Http Code:%d,body:%s", conf.GetConfig().SignPublicAddress.Hex(), "Container is exec failed", post.StatusCode, string(all))
taskExecResult.TaskExecError = fmt.Sprintf("%s,Container Http Code:%d,body:%s", "Container is exec failed", post.StatusCode, string(all))
} else {
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,Container Http Code:%d,body:%s", conf.GetConfig().SignPublicAddress.Hex(), "Read container body failed", post.StatusCode, "")
taskExecResult.TaskExecError = fmt.Sprintf("%s,Container Http Code:%d,body:%s", "Read container body failed", post.StatusCode, "")
}
apiRes := make([]string, 0)
apiResBody := utils.EncodeJsonEscapeHTML(apiRes)
taskExecResult.TaskRespBody = apiResBody
log.WithField("error", post.Body).WithField("taskId", taskMsg.TaskId).Error("Exec task result is failed")
}
if taskMsg.TaskKind == baseV1.TaskKind_ComputeTask {
......@@ -427,7 +430,7 @@ func (t *TaskHandler) foundTaskImage(taskExecResult *models.TaskResult, taskCmd
images, err := t.DockerOp.PsImages()
if err != nil {
log.Error("Ps images failed:", err)
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "Ps images failed:", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Ps images failed:", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
isSuccess = false
imageId = ""
......@@ -613,7 +616,7 @@ func (t *TaskHandler) checkContainerHealthy(internalIp string, internalPort uint
healthyCheckResp, err := t.HttpClient.Get(healthCheckUrl)
if err != nil {
log.Errorf("Request container healthy failed: %s", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "Request container healthy failed", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Request container healthy failed", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return false
}
......@@ -622,13 +625,13 @@ func (t *TaskHandler) checkContainerHealthy(internalIp string, internalPort uint
err = json.Unmarshal(body, m)
if err != nil {
log.Errorf("Json unmarshal container healthy body failed: %s", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "Json unmarshal container healthy body failed", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Json unmarshal container healthy body failed", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return false
}
if m.Status != models.READY {
log.Errorf("The container is not ready")
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "The container is not ready")
taskExecResult.TaskExecError = fmt.Sprintf("%s", "The container is not ready")
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return false
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment