Commit ac8fe82c authored by duanjinfei's avatar duanjinfei

update serial exec task

parent cb5836ef
......@@ -286,7 +286,6 @@ func handlerMsg(nodeManager *models.NodeManagerClient,
//msgRespWorker.RegisterMsgResp(nodeManager, worker, GoodbyeResp, params)
return
}
taskMsgWorker.Wg.Wait()
taskMsgWorker.Wg.Add(1)
taskMsgWorker.TaskMsg <- taskMsg
taskMsgWorker.Wg.Wait()
......
......@@ -35,9 +35,10 @@ type TaskHandler struct {
HttpClient *http.Client
IsExecAiTask bool
IsExecStandardTask bool
ExecTaskIdIsSuccess *sync.Map
}
var oldTaskImageName string
var oldTaskImageName, oldTaskId string
func NewTaskWorker(op *operate.DockerOp) *TaskHandler {
return &TaskHandler{
......@@ -48,6 +49,7 @@ func NewTaskWorker(op *operate.DockerOp) *TaskHandler {
TaskMsg: make(chan *nodeManagerV1.PushTaskMessage, 0),
HttpClient: &http.Client{},
IsExecAiTask: false,
ExecTaskIdIsSuccess: &sync.Map{},
}
}
......@@ -92,6 +94,35 @@ func (t *TaskHandler) SystemTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) {
defer t.Wg.Done()
if taskMsg.TaskKind == baseV1.TaskKind_ComputeTask {
t.IsExecAiTask = true
if t.IsExecStandardTask {
//todo: 停止标准任务容器
//containers := t.DockerOp.ListContainer()
//for _, container := range containers {
// if container.Image == taskCmd.ImageName && container.State == "running" {
// t.DockerOp.StopContainer(container.ID)
// }
//}
t.IsExecStandardTask = false
}
} else if taskMsg.TaskKind == baseV1.TaskKind_StandardTask {
t.IsExecStandardTask = true
}
if oldTaskId != taskMsg.TaskId {
for {
if oldTaskId == "" {
oldTaskId = taskMsg.TaskId
break
}
value, _ := t.ExecTaskIdIsSuccess.Load(oldTaskId)
isSuccess := value.(bool)
if isSuccess {
oldTaskId = taskMsg.TaskId
break
}
}
}
taskExecResult := &models.TaskResult{
TaskHttpStatusCode: 200,
TaskRespBody: nil,
......@@ -108,25 +139,10 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
if err != nil {
log.Errorf("failed to unmarshal task cmd: %s", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "failed to unmarshal task cmd: %s", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
log.Info("received task cmd :", taskCmd)
if taskMsg.TaskKind == baseV1.TaskKind_ComputeTask {
t.IsExecAiTask = true
if t.IsExecStandardTask {
//todo: 停止标准任务容器
//containers := t.DockerOp.ListContainer()
//for _, container := range containers {
// if container.Image == taskCmd.ImageName && container.State == "running" {
// t.DockerOp.StopContainer(container.ID)
// }
//}
t.IsExecStandardTask = false
}
} else if taskMsg.TaskKind == baseV1.TaskKind_StandardTask {
t.IsExecStandardTask = true
}
log.WithField("oldTaskImageName", oldTaskImageName).WithField("newTaskImageName", taskCmd.ImageName).Info("task image info")
if oldTaskImageName != "" && oldTaskImageName != taskCmd.ImageName {
//todo: 停止标准任务容器
......@@ -147,6 +163,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
if err != nil {
log.Error("Ps images failed:", err)
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Ps images failed:", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
imageId := ""
......@@ -167,6 +184,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
if !isFound {
log.Error("The image is not found:", taskCmd.ImageName)
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "The image is not found:", taskCmd.ImageName)
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
running, internalIp, internalPort := t.foundImageIsRunning(imageId)
......@@ -189,6 +207,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
if err != nil {
log.Errorf("Create and start container failed: %s", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Create and start container failed", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
log.Infof("Started container with ID %s", containerId)
......@@ -210,6 +229,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
if err != nil {
log.WithField("err", err).Error("Error unmarshalling task parameter")
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Error unmarshalling task parameter", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
reqContainerBody := bytes.NewReader(taskParam.Body)
......@@ -221,6 +241,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
if err != nil {
log.WithField("error:", err).Error("New container request failed")
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Http client new container request failed", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
request.Header.Set("Content-Type", "application/json")
......@@ -233,17 +254,20 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
if err != nil {
log.WithError(err).Error("json unmarshal task body failed")
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Json unmarshal task body failed", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
if m.WebHook == "" {
log.Error("Request webhook is nil")
taskExecResult.TaskExecError = fmt.Sprintf("%s", "Request webhook is nil")
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
} else {
_, err := url.Parse(m.WebHook)
if err != nil {
log.WithError(err).Error("web hook url parse failed")
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Web hook url parse failed", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
}
......@@ -255,6 +279,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
if err != nil {
log.WithField("error:", err).Error("Http client post request container failed")
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Http client post request container failed", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
endAfterTaskTime := time.Since(startBeforeTaskTime)
......@@ -266,6 +291,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
if err != nil {
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s,Container Http Code:%d", "Read container body failed", err.Error(), post.StatusCode)
log.Error("Read container body failed", err)
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
isUseFileCache := true
......@@ -410,6 +436,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
if err != nil {
log.WithError(err).Error("JSON marshal container header failed")
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "JSON marshal container header failed", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
log.WithField("headers", post.Header).Info("return task http headers")
......@@ -422,9 +449,11 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
all, err := io.ReadAll(post.Body)
if err != nil {
log.Error("JSON read error: ", err)
taskExecResult.TaskExecError = fmt.Sprintf("%s,Container Http Code:%d,err:%s", "Read container body failed", post.StatusCode, err)
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
taskExecResult.TaskExecError = fmt.Sprintf("%s,Container Http Code:%d,body:%s", "Read container body failed", post.StatusCode, string(all))
taskExecResult.TaskExecError = fmt.Sprintf("%s,Container Http Code:%d,body:%s", "Container is exec failed", post.StatusCode, string(all))
} else {
taskExecResult.TaskExecError = fmt.Sprintf("%s,Container Http Code:%d,body:%s", "Read container body failed", post.StatusCode, "")
}
......@@ -438,6 +467,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
} else if taskMsg.TaskKind == baseV1.TaskKind_StandardTask {
t.IsExecStandardTask = false
}
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
//log.WithField("result", taskExecResult).Info("lru cache storage task result")
log.Info("received computeTask--------------------------------")
}
......
package test
import (
"encoding/json"
"fmt"
"sync"
"testing"
"time"
)
func Test_initConfig(t *testing.T) {
......@@ -14,32 +15,55 @@ func Test_initConfig(t *testing.T) {
//address := crypto.PubkeyToAddress(prvKey.PublicKey)
//fmt.Println("address:", address)
// JSON 2 数据
jsonData := `{
"completed_at": "2023-07-02T02:13:48.764861Z",
"output": ["ss","sss"]
}`
// 解析 JSON 数据到 map[string]json.RawMessage
var m map[string]json.RawMessage
if err := json.Unmarshal([]byte(jsonData), &m); err != nil {
fmt.Println("解析 JSON 数据时出错:", err)
return
}
// 解析 "output" 字段
//var output [][]string
//// JSON 2 数据
//jsonData := `{
// "completed_at": "2023-07-02T02:13:48.764861Z",
// "output": ["ss","sss"]
//}`
//
//// 解析 JSON 数据到 map[string]json.RawMessage
//var m map[string]json.RawMessage
//if err := json.Unmarshal([]byte(jsonData), &m); err != nil {
// fmt.Println("解析 JSON 数据时出错:", err)
// return
//}
//
//// 解析 "output" 字段
////var output [][]string
////if err := json.Unmarshal(m["output"], &output); err != nil {
//// fmt.Println("解析 output 字段时出错:", err)
//// return
////}
//
//var output []string
//if err := json.Unmarshal(m["output"], &output); err != nil {
// fmt.Println("解析 output 字段时出错:", err)
// return
//}
//
//// 输出结果
//fmt.Println("Output Type:", output)
var output []string
if err := json.Unmarshal(m["output"], &output); err != nil {
fmt.Println("解析 output 字段时出错:", err)
return
}
wg := &sync.WaitGroup{}
ticker := time.NewTicker(time.Second * 1)
for {
select {
case <-ticker.C:
go func(wg *sync.WaitGroup) {
wg.Wait()
fmt.Println("111")
wg.Add(1)
go Sell(wg)
wg.Wait()
// 输出结果
fmt.Println("Output Type:", output)
fmt.Println("333")
}(wg)
}
}
}
func Sell(wg *sync.WaitGroup) {
defer wg.Done()
time.Sleep(6 * time.Second)
fmt.Println("222")
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment