Commit a002bc52 authored by duanjinfei's avatar duanjinfei

handler task exec

parent 58123d00
......@@ -12,6 +12,9 @@ const (
UseFileCache = "USE-FILE-CACHE"
Prefer = "Prefer"
Async = "respond-async"
HealthCheckAPI = "/health-check"
ReplicateImageNameSuffix = "docker.agicoin.ai/agicoin"
READY = "READY"
ModelPublishStatusYes = 1
ModelPublishStatusNo = 2
)
......@@ -77,6 +77,10 @@ type ModelInfo struct {
PublishStatus int `json:"publish_status"`
}
type HealthyCheck struct {
Status string `json:"status"`
}
type FileCacheResult struct {
Code int `json:"code"`
Msg string `json:"msg"`
......
......@@ -303,12 +303,17 @@ func handlerMsg(nodeManager *models.NodeManagerClient,
taskExecRes = taskExecResInterface.(*models.TaskResult)
}
isSuccess := taskExecRes.TaskIsSuccess
containerSign := taskMsgWorker.DockerOp.GetContainerSign(taskMsg, taskExecRes.TaskRespBody)
containerSign := make([]byte, 0)
if taskExecRes.TaskRespBody != nil {
containerSign = taskMsgWorker.DockerOp.GetContainerSign(taskMsg, taskExecRes.TaskRespBody)
if containerSign == nil || len(containerSign) == 0 {
log.Error("Container signing failed................")
isSuccess = false
taskExecRes.TaskExecError = fmt.Sprintf("%s-%s", "Container sign failed", taskExecRes.TaskExecError)
}
} else {
taskExecRes.TaskExecError = fmt.Sprintf("%s-%s", "Task exec error", taskExecRes.TaskExecError)
}
reqHash, respHash, minerSign := taskMsgWorker.GetMinerSign(taskMsg, taskExecRes.TaskRespBody)
params := buildParams(taskMsg.TaskId, containerSign, minerSign, taskExecRes, isSuccess)
taskMsgWorker.Mutex.Lock()
......
......@@ -94,40 +94,8 @@ func (t *TaskHandler) SystemTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) {
defer t.Wg.Done()
if taskMsg.TaskKind == baseV1.TaskKind_ComputeTask {
t.IsExecAiTask = true
if t.IsExecStandardTask {
//todo: 停止标准任务容器
//containers := t.DockerOp.ListContainer()
//for _, container := range containers {
// if container.Image == taskCmd.ImageName && container.State == "running" {
// t.DockerOp.StopContainer(container.ID)
// }
//}
t.IsExecStandardTask = false
}
} else if taskMsg.TaskKind == baseV1.TaskKind_StandardTask {
t.IsExecStandardTask = true
}
if oldTaskId != taskMsg.TaskId {
for {
if oldTaskId == "" {
oldTaskId = taskMsg.TaskId
break
}
value, ok := t.ExecTaskIdIsSuccess.Load(oldTaskId)
log.WithField("isSuccess", value).Info("Task id exec info")
if !ok {
log.Warn("First exec task")
break
}
isSuccess := value.(bool)
if isSuccess {
oldTaskId = taskMsg.TaskId
break
}
}
}
t.checkLastTaskExecStatus(taskMsg)
log.Info("check last task exec status successful")
taskExecResult := &models.TaskResult{
TaskHttpStatusCode: 200,
TaskRespBody: nil,
......@@ -143,52 +111,19 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
err := json.Unmarshal(bytes.NewBufferString(taskMsg.TaskCmd).Bytes(), taskCmd)
if err != nil {
log.Errorf("failed to unmarshal task cmd: %s", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "failed to unmarshal task cmd: %s", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "failed to unmarshal task cmd: %s", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
log.Info("received task cmd :", taskCmd)
log.WithField("oldTaskImageName", oldTaskImageName).WithField("newTaskImageName", taskCmd.ImageName).Info("task image info")
if oldTaskImageName != "" && oldTaskImageName != taskCmd.ImageName {
//todo: 停止标准任务容器
containers := t.DockerOp.ListContainer()
for _, container := range containers {
split := strings.Split(container.Image, ":")
if len(split) == 1 {
container.Image = fmt.Sprintf("%s:%s", container.Image, "latest")
}
if container.Image == taskCmd.ImageName && container.State == "running" {
t.DockerOp.StopContainer(container.ID)
log.WithField("Image name", container.Image).Info("Stopping container")
}
}
oldTaskImageName = taskCmd.ImageName
}
images, err := t.DockerOp.PsImages()
if err != nil {
log.Error("Ps images failed:", err)
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Ps images failed:", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
imageId := ""
isFound := false
for _, image := range images {
if isFound {
break
}
for _, tag := range image.RepoTags {
if tag == taskCmd.ImageName {
imageId = image.ID
isFound = true
log.Info("The image found success:", image.ID)
break
}
}
}
if !isFound {
t.checkIsStopContainer(taskCmd)
log.Info("check is stop container finished")
isFound, imageId := t.foundTaskImage(taskExecResult, taskCmd, taskMsg)
log.Info("found task image finished")
if !isFound || imageId == "" {
log.Error("The image is not found:", taskCmd.ImageName)
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "The image is not found:", taskCmd.ImageName)
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "The image is not found:", taskCmd.ImageName)
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
......@@ -211,14 +146,20 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
containerId, err := t.DockerOp.CreateAndStartContainer(taskCmd.ImageName, taskCmd.DockerCmd)
if err != nil {
log.Errorf("Create and start container failed: %s", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Create and start container failed", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "Create and start container failed", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
log.Infof("Started container with ID %s", containerId)
time.Sleep(time.Second * 40)
time.Sleep(time.Second * 60)
running, internalIp, internalPort = t.foundImageIsRunning(imageId)
if running {
isMatch := strings.HasPrefix(taskCmd.ImageName, models.ReplicateImageNameSuffix)
if isMatch {
if !t.checkContainerHealthy(internalIp, internalPort, taskMsg, taskExecResult) {
return
}
}
taskCmd.ApiUrl = fmt.Sprintf("http://%s:%d%s", internalIp, internalPort, taskCmd.ApiUrl)
log.Info("Container ports:", internalPort)
log.WithField("ApiUrl", taskCmd.ApiUrl).Info("The image is not running")
......@@ -233,7 +174,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
err = json.Unmarshal(taskMsg.TaskParam, taskParam)
if err != nil {
log.WithField("err", err).Error("Error unmarshalling task parameter")
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Error unmarshalling task parameter", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "Error unmarshalling task parameter", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
......@@ -245,7 +186,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
request, err := http.NewRequest("POST", taskCmd.ApiUrl, reqContainerBody)
if err != nil {
log.WithField("error:", err).Error("New container request failed")
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Http client new container request failed", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "Http client new container request failed", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
......@@ -258,20 +199,20 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
err := json.Unmarshal(taskParam.Body, m)
if err != nil {
log.WithError(err).Error("json unmarshal task body failed")
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Json unmarshal task body failed", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "Json unmarshal task body failed", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
if m.WebHook == "" {
log.Error("Request webhook is nil")
taskExecResult.TaskExecError = fmt.Sprintf("%s", "Request webhook is nil")
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "Request webhook is nil")
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
} else {
_, err := url.Parse(m.WebHook)
if err != nil {
log.WithError(err).Error("web hook url parse failed")
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Web hook url parse failed", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "Web hook url parse failed", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
......@@ -283,7 +224,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
post, err := t.HttpClient.Do(request)
if err != nil {
log.WithField("error:", err).Error("Http client post request container failed")
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Http client post request container failed", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "Http client post request container failed", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
......@@ -294,7 +235,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
taskExecResult.TaskHttpStatusCode = http.StatusOK
readBody, err := io.ReadAll(post.Body)
if err != nil {
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s,Container Http Code:%d", "Read container body failed", err.Error(), post.StatusCode)
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,%s,Container Http Code:%d", conf.GetConfig().SignPublicAddress.Hex(), "Read container body failed", err.Error(), post.StatusCode)
log.Error("Read container body failed", err)
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
......@@ -329,7 +270,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
ossUri, err := t.getFileCache(respStr, taskMsg, taskParam, taskCmd)
if err != nil || ossUri == "" {
if err != nil {
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Get file cache uri failed", err)
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "Get file cache uri failed", err)
}
apiResOneArr = append(apiResOneArr, respStr)
apiResBody := utils.EncodeJsonEscapeHTML(apiRes)
......@@ -366,7 +307,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
ossUri, err := t.getFileCache(respStr, taskMsg, taskParam, taskCmd)
if err != nil || ossUri == "" {
if err != nil {
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Get file cache uri failed", err)
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "Get file cache uri failed", err)
}
apiRes = append(apiRes, respStr)
apiResBody := utils.EncodeJsonEscapeHTML(apiRes)
......@@ -402,7 +343,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
ossUri, err := t.getFileCache(respStr, taskMsg, taskParam, taskCmd)
if err != nil || ossUri == "" {
if err != nil {
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Get file cache uri failed", err)
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "Get file cache uri failed", err)
}
apiRes = append(apiRes, respStr)
apiResBody := utils.EncodeJsonEscapeHTML(apiRes)
......@@ -426,7 +367,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
}
default:
log.Error("data is unknown type", v)
taskExecResult.TaskExecError = "Container resp data is unknown type"
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "Container resp data is unknown type")
apiRes := make([]string, 0)
apiResBody := utils.EncodeJsonEscapeHTML(apiRes)
taskExecResult.TaskRespBody = apiResBody
......@@ -440,7 +381,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
headers, err := json.Marshal(post.Header)
if err != nil {
log.WithError(err).Error("JSON marshal container header failed")
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "JSON marshal container header failed", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "JSON marshal container header failed", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
......@@ -454,13 +395,13 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
all, err := io.ReadAll(post.Body)
if err != nil {
log.Error("JSON read error: ", err)
taskExecResult.TaskExecError = fmt.Sprintf("%s,Container Http Code:%d,err:%s", "Read container body failed", post.StatusCode, err)
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,Container Http Code:%d,err:%s", conf.GetConfig().SignPublicAddress.Hex(), "Read container body failed", post.StatusCode, err)
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
taskExecResult.TaskExecError = fmt.Sprintf("%s,Container Http Code:%d,body:%s", "Container is exec failed", post.StatusCode, string(all))
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,Container Http Code:%d,body:%s", conf.GetConfig().SignPublicAddress.Hex(), "Container is exec failed", post.StatusCode, string(all))
} else {
taskExecResult.TaskExecError = fmt.Sprintf("%s,Container Http Code:%d,body:%s", "Read container body failed", post.StatusCode, "")
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,Container Http Code:%d,body:%s", conf.GetConfig().SignPublicAddress.Hex(), "Read container body failed", post.StatusCode, "")
}
apiRes := make([]string, 0)
apiResBody := utils.EncodeJsonEscapeHTML(apiRes)
......@@ -477,6 +418,34 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
log.Info("received computeTask--------------------------------")
}
func (t *TaskHandler) foundTaskImage(taskExecResult *models.TaskResult, taskCmd *models.TaskCmd, taskMsg *nodeManagerV1.PushTaskMessage) (isSuccess bool, imageId string) {
images, err := t.DockerOp.PsImages()
if err != nil {
log.Error("Ps images failed:", err)
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "Ps images failed:", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
isSuccess = false
imageId = ""
return
}
isFound := false
for _, image := range images {
if isFound {
break
}
for _, tag := range image.RepoTags {
if tag == taskCmd.ImageName {
imageId = image.ID
isFound = true
log.Info("The image found success:", image.ID)
break
}
}
}
isSuccess = isFound
return
}
func (t *TaskHandler) CustomTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) {
defer t.Wg.Done()
_, err := t.DockerOp.PsImages()
......@@ -589,6 +558,91 @@ func (t *TaskHandler) getFileCache(respStr string, taskMsg *nodeManagerV1.PushTa
return "", nil
}
func (t *TaskHandler) checkLastTaskExecStatus(taskMsg *nodeManagerV1.PushTaskMessage) {
if taskMsg.TaskKind == baseV1.TaskKind_ComputeTask {
t.IsExecAiTask = true
if t.IsExecStandardTask {
//todo: 停止标准任务容器
//containers := t.DockerOp.ListContainer()
//for _, container := range containers {
// if container.Image == taskCmd.ImageName && container.State == "running" {
// t.DockerOp.StopContainer(container.ID)
// }
//}
t.IsExecStandardTask = false
}
} else if taskMsg.TaskKind == baseV1.TaskKind_StandardTask {
t.IsExecStandardTask = true
}
if oldTaskId != taskMsg.TaskId {
for {
if oldTaskId == "" {
oldTaskId = taskMsg.TaskId
break
}
value, ok := t.ExecTaskIdIsSuccess.Load(oldTaskId)
log.WithField("isSuccess", value).Info("Task id exec info")
if !ok {
log.WithField("task id", oldTaskId).Warn("task exec is not finished")
continue
}
isSuccess := value.(bool)
if isSuccess {
oldTaskId = taskMsg.TaskId
break
}
}
}
}
func (t *TaskHandler) checkContainerHealthy(internalIp string, internalPort uint16, taskMsg *nodeManagerV1.PushTaskMessage, taskExecResult *models.TaskResult) bool {
healthCheckUrl := fmt.Sprintf("http://%s:%d%s", internalIp, internalPort, models.HealthCheckAPI)
healthyCheckResp, err := t.HttpClient.Get(healthCheckUrl)
if err != nil {
log.Errorf("Request container healthy failed: %s", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "Request container healthy failed", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return false
}
body, err := io.ReadAll(healthyCheckResp.Body)
m := &models.HealthyCheck{}
err = json.Unmarshal(body, m)
if err != nil {
log.Errorf("Json unmarshal container healthy body failed: %s", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "Json unmarshal container healthy body failed", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return false
}
if m.Status != models.READY {
log.Errorf("The container is not ready")
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "The container is not ready")
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return false
}
return true
}
func (t *TaskHandler) checkIsStopContainer(taskCmd *models.TaskCmd) {
if oldTaskImageName != "" && oldTaskImageName != taskCmd.ImageName {
//todo: 停止标准任务容器
containers := t.DockerOp.ListContainer()
for _, container := range containers {
split := strings.Split(container.Image, ":")
if len(split) == 1 {
container.Image = fmt.Sprintf("%s:%s", container.Image, "latest")
}
if container.Image == taskCmd.ImageName && container.State == "running" {
t.DockerOp.StopContainer(container.ID)
log.WithField("Image name", container.Image).Info("Stopping container")
break
}
}
oldTaskImageName = taskCmd.ImageName
} else {
oldTaskImageName = taskCmd.ImageName
}
}
func parseData(readBody []byte) interface{} {
var m map[string]json.RawMessage
if err := json.Unmarshal(readBody, &m); err != nil {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment