Commit 1c4d4744 authored by duanjinfei's avatar duanjinfei

pull image and container sign

parent 2ada1ade
......@@ -10,4 +10,6 @@ const (
ContentType = "type"
RedirectCode = 303
UseFileCache = "USE-FILE-CACHE"
ModelPublishStatusYes = 1
ModelPublishStatusNo = 2
)
......@@ -66,6 +66,7 @@ type ModelInfo struct {
ImageName string `json:"image_name"`
//OutPutJson string `json:"out_put_json"`
FileExpiresTime string `json:"file_expires_time"`
PublishStatus int `json:"publish_status"`
}
type ComputeResult struct {
......
......@@ -63,10 +63,8 @@ func monitorModelInfo(dockerOp *operate.DockerOp) {
// todo: 判断机器资源是否够用
isPull := isResourceEnough(modelInfo)
// todo: 如果够用
if isPull {
if isPull && modelInfo.PublishStatus == models.ModelPublishStatusYes {
go dockerOp.PullImage(modelInfo)
// todo: 是否立马上报数据
// reportTaskIds = append(reportTaskIds, modelInfo.TaskId)
}
} else {
reportTaskIds = append(reportTaskIds, modelInfo.TaskId)
......@@ -101,5 +99,5 @@ func reportModelInfo(nodeManager *models.NodeManagerClient,
}
func isResourceEnough(modelInfo *models.ModelInfo) bool {
return false
return true
}
......@@ -19,6 +19,7 @@ import (
"mime/multipart"
"net/http"
"strconv"
"strings"
"sync"
"time"
)
......@@ -122,12 +123,16 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
}
if oldTaskImageName != "" && oldTaskImageName != taskCmd.ImageName {
//todo: 停止标准任务容器
//containers := t.DockerOp.ListContainer()
//for _, container := range containers {
// if container.Image == taskCmd.ImageName && container.State == "running" {
// t.DockerOp.StopContainer(container.ID)
// }
//}
containers := t.DockerOp.ListContainer()
for _, container := range containers {
split := strings.Split(container.Image, ":")
if len(split) == 1 {
container.Image = fmt.Sprintf("%s:%s", container.Image, "latest")
}
if container.Image == taskCmd.ImageName && container.State == "running" {
t.DockerOp.StopContainer(container.ID)
}
}
oldTaskImageName = taskCmd.ImageName
}
images, err := t.DockerOp.PsImages()
......@@ -170,17 +175,13 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
}
taskCmd.DockerCmd.HostIp = "0.0.0.0"
taskCmd.DockerCmd.HostPort = strconv.FormatInt(externalPort, 10)
//if int64(len(containers)) == conf.GetConfig().ContainerNum {
// //todo: 待定,需要根据权重去停止哪个容器
// t.DockerOp.StopAndDeleteContainer(containers[0].ID)
//}
containerId, err := t.DockerOp.CreateAndStartContainer(taskCmd.ImageName, taskCmd.DockerCmd)
if err != nil {
log.Errorf("Create and start container failed: %s", err.Error())
return
}
log.Infof("Started container with ID %s", containerId)
time.Sleep(time.Second * 20)
time.Sleep(time.Second * 40)
running, internalIp, internalPort = t.foundImageIsRunning(imageId)
if running {
taskCmd.ApiUrl = fmt.Sprintf("http://%s:%d%s", internalIp, internalPort, taskCmd.ApiUrl)
......
......@@ -55,15 +55,9 @@ func NewDockerOp() *DockerOp {
}
func (d *DockerOp) GetContainerSign(taskMsg *nodemanagerv1.PushTaskMessage, taskRes []byte) []byte {
taskParam := &models.TaskParam{}
err := json.Unmarshal(taskMsg.TaskParam, taskParam)
if err != nil {
log.WithField("err", err).Error("Error unmarshalling task parameter")
return nil
}
reqBody := &models.TaskReq{
TaskId: taskMsg.TaskId,
TaskParam: bytes.NewBuffer(taskParam.Body).String(),
TaskParam: bytes.NewBuffer(taskMsg.TaskParam).String(),
TaskResult: bytes.NewBuffer(taskRes).String(),
}
body, err := json.Marshal(reqBody)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment