Commit ba4b8d09 authored by duanjinfei's avatar duanjinfei

Merge branch 'master' into test

parents 691e48e4 6aa1d584
...@@ -8,19 +8,22 @@ import ( ...@@ -8,19 +8,22 @@ import (
"example.com/m/operate" "example.com/m/operate"
"io" "io"
"net/http" "net/http"
"os"
"strings" "strings"
"time" "time"
) )
type ModelHandler struct { type ModelHandler struct {
dockerOp *operate.DockerOp dockerOp *operate.DockerOp
client *http.Client client *http.Client
modelsFileName string
} }
func NewModelHandler(dockerOp *operate.DockerOp) *ModelHandler { func NewModelHandler(dockerOp *operate.DockerOp) *ModelHandler {
return &ModelHandler{ return &ModelHandler{
dockerOp: dockerOp, dockerOp: dockerOp,
client: &http.Client{}, client: &http.Client{},
modelsFileName: "models.json",
} }
} }
...@@ -84,11 +87,38 @@ func (m *ModelHandler) MonitorModelInfo() { ...@@ -84,11 +87,38 @@ func (m *ModelHandler) MonitorModelInfo() {
} }
m.dockerOp.ModelsInfo = modelInfosResp m.dockerOp.ModelsInfo = modelInfosResp
m.dockerOp.ReportTaskIds = reportTaskIds m.dockerOp.ReportTaskIds = reportTaskIds
err = os.WriteFile(m.modelsFileName, bodyBytes, 0644)
if err != nil {
log.WithError(err).Error("Error writing models.json")
}
ticker = time.NewTicker(time.Minute * 10) ticker = time.NewTicker(time.Minute * 10)
} }
} }
} }
func (m *ModelHandler) ReadModels() ([]*models.ModelInfo, error) {
bodyBytes, err := os.ReadFile(m.modelsFileName)
if err != nil {
log.WithError(err).WithField("fileName", m.modelsFileName).Error("Error reading")
return nil, err
}
resp := &models.Resp{}
err = json.Unmarshal(bodyBytes, resp)
if err != nil {
log.WithField("fileName", m.modelsFileName).Error("Unmarshal model response failed:", err)
return nil, err
}
if resp.Code != http.StatusOK {
log.WithField("fileName", m.modelsFileName).Error("Response code :", resp.Code)
return nil, err
}
if resp.Data == nil || len(resp.Data) == 0 {
log.WithField("fileName", m.modelsFileName).Warn("Response data is empty")
return nil, err
}
return resp.Data, nil
}
func isResourceEnough(modelInfo *models.ModelInfo) bool { func isResourceEnough(modelInfo *models.ModelInfo) bool {
return true return true
} }
...@@ -132,7 +132,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) ...@@ -132,7 +132,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
t.checkIsStopContainer(taskCmd) t.checkIsStopContainer(taskCmd)
} }
log.Info("check is stop container finished") log.Info("check is stop container finished")
isFound, imageId := t.foundTaskImage(taskExecResult, taskCmd, taskMsg) isFound, imageId := t.foundTaskImage(taskCmd)
log.Info("found task image finished") log.Info("found task image finished")
if !isFound || imageId == "" { if !isFound || imageId == "" {
log.Error("The image is not found:", taskCmd.ImageName) log.Error("The image is not found:", taskCmd.ImageName)
...@@ -170,6 +170,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) ...@@ -170,6 +170,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
isMatch := strings.HasPrefix(taskCmd.ImageName, models.ReplicateImageNameSuffix) isMatch := strings.HasPrefix(taskCmd.ImageName, models.ReplicateImageNameSuffix)
if isMatch { if isMatch {
if !t.checkContainerHealthy(internalIp, internalPort, taskMsg, taskExecResult) { if !t.checkContainerHealthy(internalIp, internalPort, taskMsg, taskExecResult) {
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return return
} }
} }
...@@ -495,12 +496,10 @@ func (t *TaskHandler) CustomTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) ...@@ -495,12 +496,10 @@ func (t *TaskHandler) CustomTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
log.Info("received customTask--------------------------------") log.Info("received customTask--------------------------------")
} }
func (t *TaskHandler) foundTaskImage(taskExecResult *models.TaskResult, taskCmd *models.TaskCmd, taskMsg *nodeManagerV1.PushTaskMessage) (isSuccess bool, imageId string) { func (t *TaskHandler) foundTaskImage(taskCmd *models.TaskCmd) (isSuccess bool, imageId string) {
images, err := t.DockerOp.PsImages() images, err := t.DockerOp.PsImages()
if err != nil { if err != nil {
log.Error("Ps images failed:", err) log.Error("Ps images failed:", err)
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Ps images failed:", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
isSuccess = false isSuccess = false
imageId = "" imageId = ""
return return
...@@ -663,7 +662,6 @@ func (t *TaskHandler) checkContainerHealthy(internalIp string, internalPort uint ...@@ -663,7 +662,6 @@ func (t *TaskHandler) checkContainerHealthy(internalIp string, internalPort uint
if err != nil { if err != nil {
log.Errorf("Request container healthy failed: %s", err.Error()) log.Errorf("Request container healthy failed: %s", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Request container healthy failed", err.Error()) taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Request container healthy failed", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return false return false
} }
if healthyCheckResp.StatusCode == http.StatusNotFound { if healthyCheckResp.StatusCode == http.StatusNotFound {
...@@ -675,13 +673,11 @@ func (t *TaskHandler) checkContainerHealthy(internalIp string, internalPort uint ...@@ -675,13 +673,11 @@ func (t *TaskHandler) checkContainerHealthy(internalIp string, internalPort uint
if err != nil { if err != nil {
log.Errorf("Json unmarshal container healthy body failed: %s", err.Error()) log.Errorf("Json unmarshal container healthy body failed: %s", err.Error())
taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Json unmarshal container healthy body failed", err.Error()) taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Json unmarshal container healthy body failed", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return false return false
} }
if m.Status != models.READY { if m.Status != models.READY {
log.Errorf("The container is not ready") log.Errorf("The container is not ready")
taskExecResult.TaskExecError = fmt.Sprintf("%s", "The container is not ready") taskExecResult.TaskExecError = fmt.Sprintf("%s", "The container is not ready")
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return false return false
} }
return true return true
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment