Commit a44629ec authored by duanjinfei's avatar duanjinfei

update isCanExecute condition

parent 52549856
......@@ -52,6 +52,9 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node
isCanExecute, bootUpTime, queueWaitTime, executeTime := taskMsgWorker.GetAckResp(taskMsg)
ackParams := utils.BuildParams(taskMsg.TaskId, isCanExecute, bootUpTime, queueWaitTime, executeTime)
msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, RespTaskAck, ackParams)
if !isCanExecute {
return
}
taskMsgWorker.Wg.Add(1)
taskMsgWorker.TaskMsg <- taskMsg
taskMsgWorker.Wg.Wait()
......
......@@ -159,14 +159,13 @@ func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
t.checkIsStopContainer(taskOp.taskCmd)
}
log.Info("check is stop container finished")
imageId := t.foundTaskImage(taskOp.taskCmd)
log.Info("found task image finished")
if imageId == "" {
log.Error("The image is not found:", taskOp.taskCmd.ImageName)
taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "The image is not found:", taskOp.taskCmd.ImageName)
t.ExecTaskIdIsFinished.Store(taskMsg.TaskId, true)
return
}
//imageId := t.foundTaskImage(taskOp.taskCmd)
//if imageId == "" {
// log.Error("The image is not found:", taskOp.taskCmd.ImageName)
// taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "The image is not found:", taskOp.taskCmd.ImageName)
// t.ExecTaskIdIsFinished.Store(taskMsg.TaskId, true)
// return
//}
err = json.Unmarshal(taskMsg.TaskParam, taskOp.taskParam)
if err != nil {
log.WithField("err", err).Error("Error unmarshalling task parameter")
......@@ -174,7 +173,7 @@ func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
t.ExecTaskIdIsFinished.Store(taskMsg.TaskId, true)
return
}
running, _, _ := t.foundImageIsRunning(imageId)
running, _, _ := t.foundImageIsRunning(taskOp.taskCmd.ImageName)
if !running {
taskOp.taskCmd.DockerCmd.HostIp = models.ZeroHost
taskOp.taskCmd.DockerCmd.HostPort = t.getExternalPort()
......@@ -187,7 +186,7 @@ func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
}
log.Infof("Started container with ID %s", containerId)
}
if err = taskOp.waitContainerRunning(t, imageId); err != nil {
if err = taskOp.waitContainerRunning(t, taskOp.taskCmd.ImageName); err != nil {
taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s", err.Error())
t.ExecTaskIdIsFinished.Store(taskMsg.TaskId, true)
return
......@@ -236,6 +235,11 @@ func (t *TaskWorker) GetAckResp(taskMsg *nodeManagerV1.PushTaskMessage) (isCanEx
}
}
}
if t.foundTaskImage(taskCmd) == "" {
log.WithField("imageName", taskCmd.ImageName).Error("The image is not found")
return
}
log.Info("found task image finished")
isCanExecute = true
modelInfo := t.DockerOp.GetImageInfo(taskCmd.ImageName)
if modelInfo != nil {
......@@ -269,10 +273,10 @@ func (t *TaskWorker) foundTaskImage(taskCmd *models.TaskCmd) (imageId string) {
return
}
func (t *TaskWorker) foundImageIsRunning(imageId string) (bool, string, uint16) {
func (t *TaskWorker) foundImageIsRunning(imageName string) (bool, string, uint16) {
containers := t.DockerOp.ListContainer()
for _, container := range containers {
if container.ImageID == imageId && container.State == "running" {
if container.Image == imageName && container.State == "running" {
networks := container.NetworkSettings.Networks
ip := ""
for _, endPoint := range networks {
......@@ -466,9 +470,9 @@ func (op *TaskOp) checkContainerHealthy(internalIp string, internalPort uint16)
return true, nil
}
func (op *TaskOp) waitContainerRunning(handler *TaskWorker, imageId string) error {
func (op *TaskOp) waitContainerRunning(handler *TaskWorker, imageName string) error {
maxExecTime := op.GetMaxExecTime()
log.WithField("maxExecTime", maxExecTime).Info("Waiting for container running", imageId)
log.WithField("maxExecTime", maxExecTime).Info("Waiting for container running", imageName)
for {
select {
case <-op.ticker.C:
......@@ -476,7 +480,7 @@ func (op *TaskOp) waitContainerRunning(handler *TaskWorker, imageId string) erro
log.Errorf("%s", "The maximum execution time for this task has been exceeded")
return fmt.Errorf("%s", "The maximum execution time for this task has been exceeded")
}
running, internalIp, internalPort := handler.foundImageIsRunning(imageId)
running, internalIp, internalPort := handler.foundImageIsRunning(imageName)
if !running {
continue
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment