Commit cb5836ef authored by duanjinfei's avatar duanjinfei

add task ack resp and serial exec task

parent 381dcbb6
......@@ -269,3 +269,16 @@ func FetchStandardTaskResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
log.Info("---------------------------------------Send fetch standard task msg ------------------------------------")
return fetchStandardTaskMsgRes
}
func RespTaskAck(params ...interface{}) *nodemanagerV1.WorkerMessage {
taskId := params[0].(string)
taskAckMsgRes := &nodemanagerV1.WorkerMessage{
Message: &nodemanagerV1.WorkerMessage_SubmitTaskAck{
SubmitTaskAck: &nodemanagerV1.SubmitTaskAck{
TaskId: taskId,
},
},
}
log.WithField("taskId", taskId).Info("---------------------------------------Send task ack msg ------------------------------------")
return taskAckMsgRes
}
......@@ -277,6 +277,8 @@ func handlerMsg(nodeManager *models.NodeManagerClient,
taskMsg := rev.GetPushTaskMessage()
if taskMsg != nil {
params := buildParams(taskMsg.TaskId)
msgRespWorker.RegisterMsgResp(nodeManager, worker, RespTaskAck, params)
go func(msgRespWorker *RespMsgWorker,
taskMsgWorker *TaskHandler, taskMsg *nodeManagerV1.PushTaskMessage) {
if !taskMsgWorker.DockerOp.IsHealthy {
......@@ -284,12 +286,11 @@ func handlerMsg(nodeManager *models.NodeManagerClient,
//msgRespWorker.RegisterMsgResp(nodeManager, worker, GoodbyeResp, params)
return
}
taskMsgWorker.Wg.Wait()
taskMsgWorker.Wg.Add(1)
taskMsgWorker.TaskMsg <- taskMsg
taskMsgWorker.Wg.Wait()
taskMsgWorker.Mutex.Lock()
taskExecResInterface, _ := taskMsgWorker.LruCache.Get(taskMsg.TaskId)
taskMsgWorker.Mutex.Unlock()
//log.WithField("result", taskExecResInterface).Info("lru cache get task result")
taskExecRes := &models.TaskResult{
TaskHttpStatusCode: 200,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment