Commit 44642bba authored by duanjinfei's avatar duanjinfei

update task handler

parent f2771c8b
......@@ -13,9 +13,13 @@ const (
UseRedirect = "Use-Redirect"
Prefer = "Prefer"
Async = "respond-async"
MaxExecTime = "MaxExecTime"
HealthCheckAPI = "/health-check"
ReplicateImageNameSuffix = "docker.agicoin.ai/agicoin"
READY = "READY"
ZeroHost = "0.0.0.0"
ModelPublishStatusYes = 1
ModelPublishStatusNo = 2
DefaultMaxExecTime = 300
DefaultTaskTimer = 2
)
......@@ -25,14 +25,13 @@ import (
"time"
)
type TaskHandler struct {
type TaskWorker struct {
Wg *sync.WaitGroup
Mutex *sync.Mutex
LruCache *lru.Cache
DockerOp *operate.DockerOp
CmdOp *operate.Command
TaskMsg chan *nodeManagerV1.PushTaskMessage
HttpClient *http.Client
IsExecAiTask bool
IsExecStandardTask bool
ExecTaskIdIsSuccess *sync.Map
......@@ -40,22 +39,32 @@ type TaskHandler struct {
oldTaskId string
}
func NewTaskWorker(op *operate.DockerOp) *TaskHandler {
return &TaskHandler{
type TaskOp struct {
taskMsg *nodeManagerV1.PushTaskMessage
taskCmd *models.TaskCmd
taskExecResult *models.TaskResult
taskParam *models.TaskParam
httpClient *http.Client
request *http.Request
ticker *time.Ticker
startBeforeTaskTime time.Time
}
func NewTaskWorker(op *operate.DockerOp) *TaskWorker {
return &TaskWorker{
Wg: &sync.WaitGroup{},
Mutex: &sync.Mutex{},
LruCache: lru.New(100),
DockerOp: op,
TaskMsg: make(chan *nodeManagerV1.PushTaskMessage, 0),
HttpClient: &http.Client{},
IsExecAiTask: false,
ExecTaskIdIsSuccess: &sync.Map{},
}
}
func (t *TaskHandler) DistributionTaskWorker(runCount int) {
func (t *TaskWorker) DistributionTaskWorker(runCount int) {
for i := 0; i < runCount; i++ {
go func(t *TaskHandler) {
go func(t *TaskWorker) {
for {
select {
case taskMsg := <-t.TaskMsg:
......@@ -86,7 +95,7 @@ func (t *TaskHandler) DistributionTaskWorker(runCount int) {
}
}
func (t *TaskHandler) GetMinerSign(msg *nodeManagerV1.PushTaskMessage, taskResult []byte) ([]byte, []byte, []byte) {
func (t *TaskWorker) GetMinerSign(msg *nodeManagerV1.PushTaskMessage, taskResult []byte) ([]byte, []byte, []byte) {
reqHash := crypto.Keccak256Hash(msg.TaskParam)
respHash := crypto.Keccak256Hash(taskResult)
signHash := crypto.Keccak256Hash(bytes.NewBufferString(msg.TaskId).Bytes(), reqHash.Bytes(), respHash.Bytes())
......@@ -99,384 +108,122 @@ func (t *TaskHandler) GetMinerSign(msg *nodeManagerV1.PushTaskMessage, taskResul
return reqHash.Bytes(), respHash.Bytes(), sign
}
func (t *TaskHandler) SystemTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) {
func (t *TaskWorker) SystemTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) {
defer t.Wg.Done()
log.Info("received systemTask--------------------------------")
}
func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) {
func (t *TaskWorker) CustomTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) {
defer t.Wg.Done()
_, err := t.DockerOp.PsImages()
if err != nil {
log.Error("custom task handler docker op ps images failed: ", err)
return
}
log.Info("received customTask--------------------------------")
}
func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) {
defer t.Wg.Done()
t.checkLastTaskExecStatus(taskMsg)
log.Info("check last task exec status successful")
taskExecResult := &models.TaskResult{
TaskHttpStatusCode: 200,
TaskRespBody: nil,
TaskHttpHeaders: nil,
TaskIsSuccess: false,
TaskExecTime: 0,
TaskExecError: "",
}
t.LruCache.Add(taskMsg.TaskId, taskExecResult)
taskCmd := &models.TaskCmd{}
err := json.Unmarshal(bytes.NewBufferString(taskMsg.TaskCmd).Bytes(), taskCmd)
taskOp := &TaskOp{
taskMsg: taskMsg,
taskCmd: &models.TaskCmd{},
taskExecResult: &models.TaskResult{
TaskHttpStatusCode: 200,
TaskRespBody: nil,
TaskHttpHeaders: nil,
TaskIsSuccess: false,
TaskExecTime: 0,
TaskExecError: "",
},
taskParam: &models.TaskParam{},
httpClient: &http.Client{},
request: &http.Request{},
ticker: time.NewTicker(time.Second * models.DefaultTaskTimer),
startBeforeTaskTime: time.Now(),
}
t.LruCache.Add(taskMsg.TaskId, taskOp.taskExecResult)
err := json.Unmarshal(bytes.NewBufferString(taskMsg.TaskCmd).Bytes(), taskOp.taskCmd)
if err != nil {
log.Errorf("failed to unmarshal task cmd: %s", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "failed to unmarshal task cmd: %s", err.Error())
taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "failed to unmarshal task cmd: %s", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
taskCmd.ImageName = fmt.Sprintf("%s-%s", taskCmd.ImageName, conf.GetConfig().OpSys)
log.Info("received task cmd :", taskCmd)
log.WithField("t.oldTaskImageName", t.oldTaskImageName).WithField("newTaskImageName", taskCmd.ImageName).Info("task image info")
taskOp.taskCmd.ImageName = fmt.Sprintf("%s-%s", taskOp.taskCmd.ImageName, conf.GetConfig().OpSys)
log.Info("received task cmd :", taskOp.taskCmd)
log.WithField("t.oldTaskImageName", t.oldTaskImageName).WithField("newTaskImageName", taskOp.taskCmd.ImageName).Info("task image info")
if taskMsg.TaskKind != baseV1.TaskKind_StandardTask {
t.checkIsStopContainer(taskCmd)
t.checkIsStopContainer(taskOp.taskCmd)
}
log.Info("check is stop container finished")
isFound, imageId := t.foundTaskImage(taskCmd)
imageId := t.foundTaskImage(taskOp.taskCmd)
log.Info("found task image finished")
if !isFound || imageId == "" {
log.Error("The image is not found:", taskCmd.ImageName)
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "The image is not found:", taskCmd.ImageName)
if imageId == "" {
log.Error("The image is not found:", taskOp.taskCmd.ImageName)
taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "The image is not found:", taskOp.taskCmd.ImageName)
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
err = json.Unmarshal(taskMsg.TaskParam, taskOp.taskParam)
if err != nil {
log.WithField("err", err).Error("Error unmarshalling task parameter")
taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Error unmarshalling task parameter", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
running, internalIp, internalPort := t.foundImageIsRunning(imageId)
if !running {
var externalPort int64
for {
// 设置种子以确保每次运行时生成不同的随机数序列
rand.Seed(time.Now().UnixNano())
// 生成一个介于 0 和 100 之间的随机整数
externalPort = rand.Int63n(10001) + 10000
log.Info("DockerOp UsedExternalPort :", t.DockerOp.UsedExternalPort[externalPort])
if t.DockerOp.UsedExternalPort[externalPort] {
continue
}
break
}
taskCmd.DockerCmd.HostIp = "0.0.0.0"
taskCmd.DockerCmd.HostPort = strconv.FormatInt(externalPort, 10)
containerId, err := t.DockerOp.CreateAndStartContainer(taskCmd.ImageName, taskCmd.DockerCmd)
taskOp.taskCmd.DockerCmd.HostIp = models.ZeroHost
taskOp.taskCmd.DockerCmd.HostPort = t.getExternalPort()
containerId, err := t.DockerOp.CreateAndStartContainer(taskOp.taskCmd.ImageName, taskOp.taskCmd.DockerCmd)
if err != nil {
log.Errorf("Create and start container failed: %s", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Create and start container failed", err.Error())
taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Create and start container failed", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
log.Infof("Started container with ID %s", containerId)
time.Sleep(time.Second * 70)
running, internalIp, internalPort = t.foundImageIsRunning(imageId)
if running {
isMatch := strings.HasPrefix(taskCmd.ImageName, models.ReplicateImageNameSuffix)
if isMatch {
if !t.checkContainerHealthy(internalIp, internalPort, taskMsg, taskExecResult) {
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
}
taskCmd.ApiUrl = fmt.Sprintf("http://%s:%d%s", internalIp, internalPort, taskCmd.ApiUrl)
log.Info("Container ports:", internalPort)
log.WithField("ApiUrl", taskCmd.ApiUrl).Info("The image is not running")
if err = taskOp.waitContainerRunning(t, imageId); err != nil {
taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
} else {
taskCmd.ApiUrl = fmt.Sprintf("http://%s:%d%s", internalIp, internalPort, taskCmd.ApiUrl)
taskOp.taskCmd.ApiUrl = fmt.Sprintf("http://%s:%d%s", internalIp, internalPort, taskOp.taskCmd.ApiUrl)
log.Info("Container ports:", internalPort)
log.WithField("ApiUrl", taskCmd.ApiUrl).Info("The image is running")
log.WithField("ApiUrl", taskOp.taskCmd.ApiUrl).Info("The image is running")
}
startBeforeTaskTime := time.Now()
taskParam := &models.TaskParam{}
err = json.Unmarshal(taskMsg.TaskParam, taskParam)
if err != nil {
log.WithField("err", err).Error("Error unmarshalling task parameter")
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Error unmarshalling task parameter", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
reqContainerBody := bytes.NewReader(taskOp.taskParam.Body)
if len(taskOp.taskParam.Queries) > 0 {
queryString := utils.MatchContainerQueryString(taskOp.taskParam.Queries)
taskOp.taskCmd.ApiUrl = fmt.Sprintf("%s?%s", taskOp.taskCmd.ApiUrl, queryString)
log.WithField("ApiUrl", taskOp.taskCmd.ApiUrl).Info("The task param query str not empty")
}
reqContainerBody := bytes.NewReader(taskParam.Body)
if len(taskParam.Queries) > 0 {
queryString := utils.MatchContainerQueryString(taskParam.Queries)
taskCmd.ApiUrl = fmt.Sprintf("%s?%s", taskCmd.ApiUrl, queryString)
}
request, err := http.NewRequest("POST", taskCmd.ApiUrl, reqContainerBody)
taskOp.request, err = http.NewRequest("POST", taskOp.taskCmd.ApiUrl, reqContainerBody)
if err != nil {
log.WithField("error:", err).Error("New container request failed")
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Http client new container request failed", err.Error())
taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Http client new container request failed", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
request.Header.Set("Content-Type", "application/json")
for key, value := range taskParam.Headers {
if key == models.Prefer {
if value[0] == models.Async {
request.Header.Set(models.Prefer, models.Async)
m := &models.ContainerRequest{}
err := json.Unmarshal(taskParam.Body, m)
if err != nil {
log.WithError(err).Error("json unmarshal task body failed")
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Json unmarshal task body failed", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
if m.WebHook == "" {
log.Error("Request webhook is nil")
taskExecResult.TaskExecError = fmt.Sprintf("%s", "Request webhook is nil")
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
} else {
_, err := url.Parse(m.WebHook)
if err != nil {
log.WithError(err).Error("web hook url parse failed")
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Web hook url parse failed", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
}
break
}
}
taskOp.request.Header.Set("Content-Type", "application/json")
if err = taskOp.validateWebHook(); err != nil {
taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
post, err := t.HttpClient.Do(request)
if err != nil {
log.WithField("error:", err).Error("Http client post request container failed")
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Http client post request container failed", err.Error())
if err = taskOp.waitReqContainerOk(t.DockerOp); err != nil {
taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
endAfterTaskTime := time.Since(startBeforeTaskTime)
endAfterTaskTime := time.Since(taskOp.startBeforeTaskTime)
taskOp.taskExecResult.TaskExecTime = endAfterTaskTime.Microseconds()
log.WithField("time", endAfterTaskTime.Seconds()).WithField("taskId", taskMsg.TaskId).Info("Exec task end (second is units) :")
log.WithField("StatusCode", post.StatusCode).WithField("taskId", taskMsg.TaskId).Info("Exec task result")
if post.StatusCode == http.StatusOK {
taskExecResult.TaskHttpStatusCode = http.StatusOK
readBody, err := io.ReadAll(post.Body)
if err != nil {
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s,Container Http Code:%d", "Read container body failed", err.Error(), post.StatusCode)
log.Error("Read container body failed", err)
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
if taskMsg.TaskKind != baseV1.TaskKind_StandardTask {
isUseFileCache := true
isUseRedirect := false
for key, value := range taskParam.Headers {
log.WithField("key", key).WithField("val", value).Debug("Headers Info")
if key == models.UseRedirect {
log.WithField("UseRedirect", value[0]).Info("Headers info")
if value[0] == "true" {
isUseRedirect = true
}
}
if key == models.UseFileCache {
log.WithField("UseFileCache", value[0]).Info("Headers info")
if value[0] == "false" {
isUseFileCache = false
break
}
}
}
log.WithField("isUseRedirect", isUseRedirect).Info("is use redirect")
log.WithField("isUseFileCache", isUseFileCache).Info("is use file cache")
if readBody != nil {
data := parseData(readBody)
if data != nil {
isSuccess := false
switch v := data.(type) {
case [][]string:
{
res := data.([][]string)
log.Info("data is [][]string type")
apiRes := make([][]string, 1)
for _, innerSlice := range res {
apiResOneArr := make([]string, 0)
for _, respStr := range innerSlice {
if respStr == "" || respStr == "null" {
continue
}
if !isUseFileCache {
apiResOneArr = append(apiResOneArr, respStr)
isSuccess = true
continue
}
ossUri, err := t.getFileCache(respStr, taskMsg, taskParam, taskCmd)
if err != nil || ossUri == "" {
if err != nil {
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Get file cache uri failed", err)
}
if ossUri == "" {
apiResOneArr = append(apiResOneArr, respStr)
isSuccess = true
}
continue
}
if isUseRedirect && ossUri != "" && len(res) == 1 && len(innerSlice) == 1 {
taskExecResult.TaskHttpStatusCode = models.RedirectCode
apiResBody := utils.EncodeJsonEscapeHTML(ossUri)
taskExecResult.TaskRespBody = apiResBody
post.Header.Set("Location", ossUri)
isSuccess = true
break
} else {
apiResOneArr = append(apiResOneArr, ossUri)
isSuccess = true
}
}
apiRes = append(apiRes, apiResOneArr)
}
if !isSuccess {
taskExecResult.TaskExecError = fmt.Sprintf("%s-%s", "Container output is nil", string(readBody))
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
if !isUseRedirect {
apiResBody := utils.EncodeJsonEscapeHTML(apiRes)
taskExecResult.TaskRespBody = apiResBody
}
}
case []string:
{
res := data.([]string)
log.Info("data is []string type")
apiRes := make([]string, 0)
for _, respStr := range res {
if respStr == "" || respStr == "null" {
continue
}
if !isUseFileCache {
apiRes = append(apiRes, respStr)
isSuccess = true
continue
}
ossUri, err := t.getFileCache(respStr, taskMsg, taskParam, taskCmd)
if err != nil || ossUri == "" {
if err != nil {
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Get file cache uri failed", err)
}
if ossUri == "" {
apiRes = append(apiRes, respStr)
isSuccess = true
}
continue
}
if isUseRedirect && ossUri != "" && len(res) == 1 {
taskExecResult.TaskHttpStatusCode = models.RedirectCode
post.Header.Set("Location", ossUri)
apiResBody := utils.EncodeJsonEscapeHTML(ossUri)
taskExecResult.TaskRespBody = apiResBody
isSuccess = true
break
} else {
apiRes = append(apiRes, ossUri)
isSuccess = true
}
}
if !isSuccess {
taskExecResult.TaskExecError = fmt.Sprintf("%s-%s", "Container output is nil", string(readBody))
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
if !isUseRedirect {
apiResBody := utils.EncodeJsonEscapeHTML(apiRes)
taskExecResult.TaskRespBody = apiResBody
}
}
case string:
{
resStr := data.(string)
log.Info("data is string type")
resArr := []string{resStr}
apiRes := make([]string, 0)
for _, respStr := range resArr {
if respStr == "" || respStr == "null" {
continue
}
if !isUseFileCache {
apiRes = append(apiRes, respStr)
isSuccess = true
continue
}
ossUri, err := t.getFileCache(respStr, taskMsg, taskParam, taskCmd)
if err != nil || ossUri == "" {
if err != nil {
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Get file cache uri failed", err)
}
if ossUri == "" {
apiRes = append(apiRes, respStr)
isSuccess = true
}
continue
}
if isUseRedirect && ossUri != "" && len(resArr) == 1 {
taskExecResult.TaskHttpStatusCode = models.RedirectCode
post.Header.Set("Location", ossUri)
apiResBody := utils.EncodeJsonEscapeHTML(ossUri)
taskExecResult.TaskRespBody = apiResBody
isSuccess = true
break
} else {
apiRes = append(apiRes, ossUri)
isSuccess = true
}
}
if !isSuccess {
taskExecResult.TaskExecError = fmt.Sprintf("%s-%s", "Container output is nil", string(readBody))
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
if !isUseRedirect {
apiResBody := utils.EncodeJsonEscapeHTML(apiRes)
taskExecResult.TaskRespBody = apiResBody
}
}
default:
log.Error("data is unknown type", v)
taskExecResult.TaskExecError = fmt.Sprintf("%s", "Container resp data is unknown type")
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
} else {
log.Error("Container resp output is nil")
taskExecResult.TaskExecError = fmt.Sprintf("%s", "Container resp output is nil")
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
taskExecResult.TaskRespBody = readBody
return
}
}
}
if taskMsg.TaskKind == baseV1.TaskKind_StandardTask {
taskExecResult.TaskRespBody = readBody
}
headers, err := json.Marshal(post.Header)
if err != nil {
log.WithError(err).Error("JSON marshal container header failed")
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "JSON marshal container header failed", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
log.WithField("headers", post.Header).Info("return task http headers")
taskExecResult.TaskHttpHeaders = headers
taskExecResult.TaskIsSuccess = true
taskExecResult.TaskExecTime = endAfterTaskTime.Microseconds()
} else {
taskExecResult.TaskHttpStatusCode = int32(post.StatusCode)
if post.Body != nil {
all, err := io.ReadAll(post.Body)
if err != nil {
log.Error("JSON read error: ", err)
taskExecResult.TaskExecError = fmt.Sprintf("%s,Container Http Code:%d,err:%s", "Read container body failed", post.StatusCode, err)
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
if taskExecResult.TaskHttpStatusCode == http.StatusConflict {
taskExecResult.TaskExecError = fmt.Sprintf("%s,Container Http Code:%d", "Already running a prediction", post.StatusCode)
} else {
taskExecResult.TaskExecError = fmt.Sprintf("%s,Container Http Code:%d,body:%s", "Container is exec failed", post.StatusCode, string(all))
}
} else {
taskExecResult.TaskExecError = fmt.Sprintf("%s,Container Http Code:%d,body:%s", "Container resp body is nil", post.StatusCode, "")
}
log.WithField("error", post.Body).WithField("taskId", taskMsg.TaskId).Error("Exec task result is failed")
}
if taskMsg.TaskKind == baseV1.TaskKind_ComputeTask {
t.IsExecAiTask = false
} else if taskMsg.TaskKind == baseV1.TaskKind_StandardTask {
......@@ -484,24 +231,13 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
}
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
//log.WithField("result", taskExecResult).Info("lru cache storage task result")
log.Info("received computeTask--------------------------------")
}
func (t *TaskHandler) CustomTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) {
defer t.Wg.Done()
_, err := t.DockerOp.PsImages()
if err != nil {
log.Error("custom task handler docker op ps images failed: ", err)
return
}
log.Info("received customTask--------------------------------")
log.Info("----------------------Compute task exec done--------------------------------")
}
func (t *TaskHandler) foundTaskImage(taskCmd *models.TaskCmd) (isSuccess bool, imageId string) {
func (t *TaskWorker) foundTaskImage(taskCmd *models.TaskCmd) (imageId string) {
images, err := t.DockerOp.PsImages()
if err != nil {
log.Error("Ps images failed:", err)
isSuccess = false
imageId = ""
return
}
......@@ -519,11 +255,10 @@ func (t *TaskHandler) foundTaskImage(taskCmd *models.TaskCmd) (isSuccess bool, i
}
}
}
isSuccess = isFound
return
}
func (t *TaskHandler) foundImageIsRunning(imageId string) (bool, string, uint16) {
func (t *TaskWorker) foundImageIsRunning(imageId string) (bool, string, uint16) {
containers := t.DockerOp.ListContainer()
for _, container := range containers {
if container.ImageID == imageId && container.State == "running" {
......@@ -539,7 +274,91 @@ func (t *TaskHandler) foundImageIsRunning(imageId string) (bool, string, uint16)
return false, "", 0
}
func (t *TaskHandler) uploadOSS(taskId string, queries string, decodedImage []byte, suffix string) (string, error) {
func (t *TaskWorker) checkLastTaskExecStatus(taskMsg *nodeManagerV1.PushTaskMessage) {
if taskMsg.TaskKind == baseV1.TaskKind_ComputeTask {
t.IsExecAiTask = true
if t.IsExecStandardTask {
//todo: 停止标准任务容器
//containers := t.DockerOp.ListContainer()
//for _, container := range containers {
// if container.Image == taskCmd.ImageName && container.State == "running" {
// t.DockerOp.StopContainer(container.ID)
// }
//}
t.IsExecStandardTask = false
}
} else if taskMsg.TaskKind == baseV1.TaskKind_StandardTask {
t.IsExecStandardTask = true
}
if t.oldTaskId != taskMsg.TaskId {
now := time.Now()
for {
since := time.Since(now)
if int64(since.Seconds()) > conf.GetConfig().WaitLastTaskExecTime {
log.WithField("taskId", t.oldTaskId).Info("Waiting for last task execution ending")
t.oldTaskId = taskMsg.TaskId
break
}
if t.oldTaskId == "" {
t.oldTaskId = taskMsg.TaskId
break
}
value, ok := t.ExecTaskIdIsSuccess.Load(t.oldTaskId)
//log.WithField("isSuccess", value).Info("Task id exec info")
if !ok {
//log.WithField("task id", t.oldTaskId).Warn("task exec is not finished")
continue
}
isSuccess := value.(bool)
if isSuccess {
t.oldTaskId = taskMsg.TaskId
log.WithField("taskId", t.oldTaskId).Info("Task exec success")
break
}
}
}
}
func (t *TaskWorker) checkIsStopContainer(taskCmd *models.TaskCmd) {
if t.oldTaskImageName != "" && t.oldTaskImageName != taskCmd.ImageName {
//todo: 停止标准任务容器
containers := t.DockerOp.ListContainer()
for _, container := range containers {
split := strings.Split(container.Image, ":")
if len(split) == 1 {
container.Image = fmt.Sprintf("%s:%s", container.Image, "latest")
}
log.WithField("containerImageName", container.Image).WithField("t.oldTaskImageName", t.oldTaskImageName).Info("match image")
if container.Image == t.oldTaskImageName && container.State == "running" {
t.DockerOp.StopContainer(container.ID)
log.WithField("Image name", container.Image).Info("Stopping container")
//t.DockerOp.RunningImages[t.oldTaskImageName] = false
break
}
}
t.oldTaskImageName = taskCmd.ImageName
} else {
t.oldTaskImageName = taskCmd.ImageName
}
}
func (t *TaskWorker) getExternalPort() (externalPort string) {
for {
// 设置种子以确保每次运行时生成不同的随机数序列
rand.Seed(time.Now().UnixNano())
// 生成一个介于 0 和 100 之间的随机整数
externalPortInt := rand.Int63n(10001) + 10000
log.WithField("externalPortInt", externalPortInt).Info("DockerOp UsedExternalPort :", t.DockerOp.UsedExternalPort[externalPortInt])
if t.DockerOp.UsedExternalPort[externalPortInt] {
continue
}
externalPort = strconv.FormatInt(externalPortInt, 10)
break
}
return
}
func (op *TaskOp) uploadOSS(taskId string, queries string, decodedImage []byte, suffix string) (string, error) {
var requestBody bytes.Buffer
writer := multipart.NewWriter(&requestBody)
// 创建文件表单字段
......@@ -566,7 +385,7 @@ func (t *TaskHandler) uploadOSS(taskId string, queries string, decodedImage []by
return "", err
}
request.Header.Set("Content-Type", writer.FormDataContentType())
response, err := t.HttpClient.Do(request)
response, err := op.httpClient.Do(request)
if err != nil {
log.WithError(err).Error("Error request oss failed")
return "", err
......@@ -595,13 +414,13 @@ func (t *TaskHandler) uploadOSS(taskId string, queries string, decodedImage []by
return "", err
}
func (t *TaskHandler) getFileCache(respStr string, taskMsg *nodeManagerV1.PushTaskMessage, taskParam *models.TaskParam, taskCmd *models.TaskCmd) (string, error) {
func (op *TaskOp) getFileCache(respStr string, dockerOp *operate.DockerOp) (string, error) {
isBase64, decodeByte, respFormat, suffix := utils.IsBase64ImageStr(respStr)
log.WithField("isBase64", isBase64).Info("resp str info")
if isBase64 {
log.WithField("taskId", taskMsg.TaskId).WithField("format", respFormat).WithField("suffix", suffix).Info("Parse container resp")
queryString := utils.MatchFileCacheQueryString(taskParam.Headers, taskCmd.ImageName, t.DockerOp.ModelsInfo, respFormat)
ossUri, err := t.uploadOSS(taskMsg.TaskId, queryString, decodeByte, suffix)
log.WithField("taskId", op.taskMsg.TaskId).WithField("format", respFormat).WithField("suffix", suffix).Info("Parse container resp")
queryString := utils.MatchFileCacheQueryString(op.taskParam.Headers, op.taskCmd.ImageName, dockerOp.ModelsInfo, respFormat)
ossUri, err := op.uploadOSS(op.taskMsg.TaskId, queryString, decodeByte, suffix)
if err != nil || ossUri == "" {
log.WithError(err).Error("upload image into file cache failed")
return "", err
......@@ -612,98 +431,325 @@ func (t *TaskHandler) getFileCache(respStr string, taskMsg *nodeManagerV1.PushTa
return "", nil
}
func (t *TaskHandler) checkLastTaskExecStatus(taskMsg *nodeManagerV1.PushTaskMessage) {
if taskMsg.TaskKind == baseV1.TaskKind_ComputeTask {
t.IsExecAiTask = true
if t.IsExecStandardTask {
//todo: 停止标准任务容器
//containers := t.DockerOp.ListContainer()
//for _, container := range containers {
// if container.Image == taskCmd.ImageName && container.State == "running" {
// t.DockerOp.StopContainer(container.ID)
// }
//}
t.IsExecStandardTask = false
}
} else if taskMsg.TaskKind == baseV1.TaskKind_StandardTask {
t.IsExecStandardTask = true
}
if t.oldTaskId != taskMsg.TaskId {
now := time.Now()
for {
since := time.Since(now)
if int64(since.Seconds()) > conf.GetConfig().WaitLastTaskExecTime {
log.WithField("taskId", t.oldTaskId).Info("Waiting for last task execution ending")
t.oldTaskId = taskMsg.TaskId
break
}
if t.oldTaskId == "" {
t.oldTaskId = taskMsg.TaskId
break
}
value, ok := t.ExecTaskIdIsSuccess.Load(t.oldTaskId)
//log.WithField("isSuccess", value).Info("Task id exec info")
if !ok {
//log.WithField("task id", t.oldTaskId).Warn("task exec is not finished")
continue
}
isSuccess := value.(bool)
if isSuccess {
t.oldTaskId = taskMsg.TaskId
log.WithField("taskId", t.oldTaskId).Info("Task exec success")
break
}
}
}
}
func (t *TaskHandler) checkContainerHealthy(internalIp string, internalPort uint16, taskMsg *nodeManagerV1.PushTaskMessage, taskExecResult *models.TaskResult) bool {
func (op *TaskOp) checkContainerHealthy(internalIp string, internalPort uint16) error {
healthCheckUrl := fmt.Sprintf("http://%s:%d%s", internalIp, internalPort, models.HealthCheckAPI)
healthyCheckResp, err := t.HttpClient.Get(healthCheckUrl)
healthyCheckResp, err := op.httpClient.Get(healthCheckUrl)
if err != nil {
log.Errorf("Request container healthy failed: %s", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Request container healthy failed", err.Error())
return false
return fmt.Errorf("%s-%s", "The container is not ready", err)
}
if healthyCheckResp.StatusCode == http.StatusNotFound {
return true
return nil
}
body, err := io.ReadAll(healthyCheckResp.Body)
m := &models.HealthyCheck{}
err = json.Unmarshal(body, m)
if err != nil {
log.Errorf("Json unmarshal container healthy body failed: %s", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Json unmarshal container healthy body failed", err.Error())
return false
return fmt.Errorf("%s,%s", "Json unmarshal container healthy body failed", err.Error())
}
if m.Status != models.READY {
log.Errorf("The container is not ready")
taskExecResult.TaskExecError = fmt.Sprintf("%s", "The container is not ready")
return false
return fmt.Errorf("%s", "The container is not ready")
}
return true
return nil
}
func (t *TaskHandler) checkIsStopContainer(taskCmd *models.TaskCmd) {
if t.oldTaskImageName != "" && t.oldTaskImageName != taskCmd.ImageName {
//todo: 停止标准任务容器
containers := t.DockerOp.ListContainer()
for _, container := range containers {
split := strings.Split(container.Image, ":")
if len(split) == 1 {
container.Image = fmt.Sprintf("%s:%s", container.Image, "latest")
func (op *TaskOp) waitContainerRunning(handler *TaskWorker, imageId string) error {
maxExecTime, err := strconv.ParseInt(op.taskParam.Headers[models.MaxExecTime][0], 10, 64)
if err != nil {
log.Errorf("%s-%s", "Parse max exec time", err.Error())
return fmt.Errorf("%s-%s", "Parse max exec time", err.Error())
}
if maxExecTime == 0 {
maxExecTime = models.DefaultMaxExecTime
}
for {
select {
case <-op.ticker.C:
if int64(time.Since(op.startBeforeTaskTime).Seconds()) > maxExecTime-50 {
log.Errorf("%s", "The maximum execution time for this task has been exceeded")
return fmt.Errorf("%s", "The maximum execution time for this task has been exceeded")
}
log.WithField("containerImageName", container.Image).WithField("t.oldTaskImageName", t.oldTaskImageName).Info("match image")
if container.Image == t.oldTaskImageName && container.State == "running" {
t.DockerOp.StopContainer(container.ID)
log.WithField("Image name", container.Image).Info("Stopping container")
//t.DockerOp.RunningImages[t.oldTaskImageName] = false
running, internalIp, internalPort := handler.foundImageIsRunning(imageId)
if !running {
continue
}
if isMatch := strings.HasPrefix(op.taskCmd.ImageName, models.ReplicateImageNameSuffix); isMatch {
if err := op.checkContainerHealthy(internalIp, internalPort); err != nil {
log.WithField("err", err).Errorf("check container healthy failed")
return fmt.Errorf("%s-%s", "check container healthy failed", err.Error())
}
}
op.taskCmd.ApiUrl = fmt.Sprintf("http://%s:%d%s", internalIp, internalPort, op.taskCmd.ApiUrl)
log.Info("Container ports:", internalPort)
log.WithField("ApiUrl", op.taskCmd.ApiUrl).Info("The image is not running")
return nil
}
}
}
func (op *TaskOp) validateWebHook() error {
for key, value := range op.taskParam.Headers {
if key == models.Prefer {
if value[0] == models.Async {
op.request.Header.Set(models.Prefer, models.Async)
m := &models.ContainerRequest{}
err := json.Unmarshal(op.taskParam.Body, m)
if err != nil {
log.WithError(err).Error("json unmarshal task body failed")
return fmt.Errorf("%s,%s", "Json unmarshal task body failed", err.Error())
}
if m.WebHook == "" {
log.Error("Request webhook is nil")
return fmt.Errorf("%s", "Request webhook is nil")
} else {
_, err := url.Parse(m.WebHook)
if err != nil {
log.WithError(err).Error("web hook url parse failed")
return fmt.Errorf("%s,%s", "Web hook url parse failed", err.Error())
}
}
break
}
}
t.oldTaskImageName = taskCmd.ImageName
} else {
t.oldTaskImageName = taskCmd.ImageName
}
return nil
}
func (op *TaskOp) waitReqContainerOk(dockerOp *operate.DockerOp) error {
maxExecTime, err := strconv.ParseInt(op.taskParam.Headers[models.MaxExecTime][0], 10, 64)
if err != nil {
log.Errorf("%s-%s", "Parse max exec time", err.Error())
return fmt.Errorf("%s-%s", "Parse max exec time", err.Error())
}
if maxExecTime == 0 {
maxExecTime = models.DefaultMaxExecTime
}
for {
select {
case <-op.ticker.C:
if int64(time.Since(op.startBeforeTaskTime).Seconds()) > maxExecTime-50 {
log.Errorf("%s", "The maximum execution time for this task has been exceeded")
return fmt.Errorf("%s", "The maximum execution time for this task has been exceeded")
}
post, err := op.httpClient.Do(op.request)
if err != nil {
log.WithField("error:", err).Error("Http client post request container failed")
return fmt.Errorf("%s,%s", "Http client post request container failed", err.Error())
}
log.WithField("StatusCode", post.StatusCode).WithField("taskId", op.taskMsg.TaskId).Info("Exec task result")
if post.StatusCode == http.StatusOK {
op.taskExecResult.TaskHttpStatusCode = http.StatusOK
readBody, err := io.ReadAll(post.Body)
if err != nil {
log.Errorf("%s,%s,Container Http Code:%d", "Read container body failed", err.Error(), post.StatusCode)
return fmt.Errorf("%s,%s,Container Http Code:%d", "Read container body failed", err.Error(), post.StatusCode)
}
if op.taskMsg.TaskKind != baseV1.TaskKind_StandardTask {
isUseFileCache := true
isUseRedirect := false
for key, value := range op.taskParam.Headers {
log.WithField("key", key).WithField("val", value).Debug("Headers Info")
if key == models.UseRedirect {
log.WithField("UseRedirect", value[0]).Info("Headers info")
if value[0] == "true" {
isUseRedirect = true
}
}
if key == models.UseFileCache {
log.WithField("UseFileCache", value[0]).Info("Headers info")
if value[0] == "false" {
isUseFileCache = false
break
}
}
}
log.WithField("isUseRedirect", isUseRedirect).Info("is use redirect")
log.WithField("isUseFileCache", isUseFileCache).Info("is use file cache")
if readBody != nil {
data := parseData(readBody)
if data != nil {
isSuccess := false
switch v := data.(type) {
case [][]string:
{
res := data.([][]string)
log.Info("data is [][]string type")
apiRes := make([][]string, 1)
for _, innerSlice := range res {
apiResOneArr := make([]string, 0)
for _, respStr := range innerSlice {
if respStr == "" || respStr == "null" {
continue
}
if !isUseFileCache {
apiResOneArr = append(apiResOneArr, respStr)
isSuccess = true
continue
}
ossUri, err := op.getFileCache(respStr, dockerOp)
if err != nil || ossUri == "" {
if err != nil {
op.taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Get file cache uri failed", err)
}
if ossUri == "" {
apiResOneArr = append(apiResOneArr, respStr)
isSuccess = true
}
continue
}
if isUseRedirect && ossUri != "" && len(res) == 1 && len(innerSlice) == 1 {
op.taskExecResult.TaskHttpStatusCode = models.RedirectCode
apiResBody := utils.EncodeJsonEscapeHTML(ossUri)
op.taskExecResult.TaskRespBody = apiResBody
post.Header.Set("Location", ossUri)
isSuccess = true
break
} else {
apiResOneArr = append(apiResOneArr, ossUri)
isSuccess = true
}
}
apiRes = append(apiRes, apiResOneArr)
}
if !isSuccess {
return fmt.Errorf("%s-%s", "Container output is nil", string(readBody))
}
if !isUseRedirect {
apiResBody := utils.EncodeJsonEscapeHTML(apiRes)
op.taskExecResult.TaskRespBody = apiResBody
}
}
case []string:
{
res := data.([]string)
log.Info("data is []string type")
apiRes := make([]string, 0)
for _, respStr := range res {
if respStr == "" || respStr == "null" {
continue
}
if !isUseFileCache {
apiRes = append(apiRes, respStr)
isSuccess = true
continue
}
ossUri, err := op.getFileCache(respStr, dockerOp)
if err != nil || ossUri == "" {
if err != nil {
op.taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Get file cache uri failed", err)
}
if ossUri == "" {
apiRes = append(apiRes, respStr)
isSuccess = true
}
continue
}
if isUseRedirect && ossUri != "" && len(res) == 1 {
op.taskExecResult.TaskHttpStatusCode = models.RedirectCode
post.Header.Set("Location", ossUri)
apiResBody := utils.EncodeJsonEscapeHTML(ossUri)
op.taskExecResult.TaskRespBody = apiResBody
isSuccess = true
break
} else {
apiRes = append(apiRes, ossUri)
isSuccess = true
}
}
if !isSuccess {
return fmt.Errorf("%s-%s", "Container output is nil", string(readBody))
}
if !isUseRedirect {
apiResBody := utils.EncodeJsonEscapeHTML(apiRes)
op.taskExecResult.TaskRespBody = apiResBody
}
}
case string:
{
resStr := data.(string)
log.Info("data is string type")
resArr := []string{resStr}
apiRes := make([]string, 0)
for _, respStr := range resArr {
if respStr == "" || respStr == "null" {
continue
}
if !isUseFileCache {
apiRes = append(apiRes, respStr)
isSuccess = true
continue
}
ossUri, err := op.getFileCache(respStr, dockerOp)
if err != nil || ossUri == "" {
if err != nil {
op.taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Get file cache uri failed", err)
}
if ossUri == "" {
apiRes = append(apiRes, respStr)
isSuccess = true
}
continue
}
if isUseRedirect && ossUri != "" && len(resArr) == 1 {
op.taskExecResult.TaskHttpStatusCode = models.RedirectCode
post.Header.Set("Location", ossUri)
apiResBody := utils.EncodeJsonEscapeHTML(ossUri)
op.taskExecResult.TaskRespBody = apiResBody
isSuccess = true
break
} else {
apiRes = append(apiRes, ossUri)
isSuccess = true
}
}
if !isSuccess {
return fmt.Errorf("%s-%s", "Container output is nil", string(readBody))
}
if !isUseRedirect {
apiResBody := utils.EncodeJsonEscapeHTML(apiRes)
op.taskExecResult.TaskRespBody = apiResBody
}
}
default:
log.Error("data is unknown type", v)
return fmt.Errorf("%s", "Container resp data is unknown type")
}
} else {
log.Error("Container resp output is nil")
op.taskExecResult.TaskRespBody = readBody
return fmt.Errorf("%s", "Container resp output is nil")
}
}
}
if op.taskMsg.TaskKind == baseV1.TaskKind_StandardTask {
op.taskExecResult.TaskRespBody = readBody
}
headers, err := json.Marshal(post.Header)
if err != nil {
log.WithError(err).Error("JSON marshal container header failed")
return fmt.Errorf("%s,%s", "JSON marshal container header failed", err.Error())
}
log.WithField("headers", post.Header).Info("return task http headers")
op.taskExecResult.TaskHttpHeaders = headers
op.taskExecResult.TaskIsSuccess = true
} else {
op.taskExecResult.TaskHttpStatusCode = int32(post.StatusCode)
if op.taskExecResult.TaskHttpStatusCode == http.StatusConflict {
log.Errorf("%s,Container Http Code:%d", "Already running a prediction", post.StatusCode)
continue
}
log.WithField("taskId", op.taskMsg.TaskId).Error("Exec task result is failed")
if post.Body != nil {
all, _ := io.ReadAll(post.Body)
return fmt.Errorf("%s,Container Http Code:%d,body:%s", "Container is exec failed", post.StatusCode, string(all))
} else {
return fmt.Errorf("%s,Container Http Code:%d,body:%s", "Container resp body is nil", post.StatusCode, "")
}
}
return nil
}
}
}
......
......@@ -111,12 +111,11 @@ func TestTaskHandler_computeTaskHandler(t1 *testing.T) {
}
for _, tt := range tests {
t1.Run(tt.name, func(t1 *testing.T) {
t := &nm.TaskHandler{
Wg: tt.fields.wg,
LruCache: tt.fields.lruCache,
DockerOp: tt.fields.DockerOp,
TaskMsg: tt.fields.TaskMsg,
HttpClient: tt.fields.HttpClient,
t := &nm.TaskWorker{
Wg: tt.fields.wg,
LruCache: tt.fields.lruCache,
DockerOp: tt.fields.DockerOp,
TaskMsg: tt.fields.TaskMsg,
}
tt.fields.wg.Add(1)
t.ComputeTaskHandler(tt.args.taskMsg)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment