Commit e42dfefb authored by duanjinfei's avatar duanjinfei

update model info

parent e7ea8956
...@@ -20,7 +20,7 @@ var ( ...@@ -20,7 +20,7 @@ var (
) )
func init() { func init() {
RootCmd.PersistentFlags().StringVarP(&rewardAddr, "reward", "r", "0x0Fb196385c8826e3806ebA2cA2cb78B26E08fEEe", "please enter a reward address") RootCmd.PersistentFlags().StringVarP(&rewardAddr, "reward", "r", "0x2E60C056fBAf4bf27945516c9364B037D5D31CC2", "please enter a reward address")
RootCmd.PersistentFlags().StringVarP(&externalIp, "externalIp", "e", "192.168.1.120", "please enter server external ip address") RootCmd.PersistentFlags().StringVarP(&externalIp, "externalIp", "e", "192.168.1.120", "please enter server external ip address")
RootCmd.PersistentFlags().StringVarP(&opSys, "opSys", "s", "", "please enter you op sys name : win、linux") RootCmd.PersistentFlags().StringVarP(&opSys, "opSys", "s", "", "please enter you op sys name : win、linux")
RootCmd.PersistentFlags().BoolVarP(&debug, "debug", "d", false, "set log level debug") RootCmd.PersistentFlags().BoolVarP(&debug, "debug", "d", false, "set log level debug")
......
...@@ -145,6 +145,7 @@ func (m *ModelHandler) MonitorModelStatus() { ...@@ -145,6 +145,7 @@ func (m *ModelHandler) MonitorModelStatus() {
for _, key := range keys { for _, key := range keys {
model, _ := db.GetModel(key) model, _ := db.GetModel(key)
if model != nil && !model.IsInstalled { if model != nil && !model.IsInstalled {
model.ImageId = image.ID
model.SetupTime = time.Now().Unix() model.SetupTime = time.Now().Unix()
model.IsInstalled = true model.IsInstalled = true
err := db.PutModel(key, model) err := db.PutModel(key, model)
...@@ -155,26 +156,26 @@ func (m *ModelHandler) MonitorModelStatus() { ...@@ -155,26 +156,26 @@ func (m *ModelHandler) MonitorModelStatus() {
} }
} }
} }
//containerList := m.dockerOp.ListContainer() containerList := m.dockerOp.ListContainer()
//if containerList != nil && len(containerList) > 0 { if containerList != nil && len(containerList) > 0 {
// for _, container := range containerList { for _, container := range containerList {
// key := container.Image key := container.Image
// model, err := db.GetModel(key) model, err := db.GetModel(key)
// if err != nil || model == nil { if err != nil || model == nil {
// continue continue
// } }
// if container.State == "running" && !model.IsRunning { if container.State == "running" && !model.IsRunning {
// model.ContainerId = container.ID model.ContainerId = container.ID
// model.LastRunTime = time.Now().Unix() model.LastRunTime = time.Now().Unix()
// model.IsRunning = true model.IsRunning = true
// err = db.PutModel(key, model) err = db.PutModel(key, model)
// if err != nil { if err != nil {
// continue continue
// } }
// } }
// }
// } }
//} ticker = time.NewTicker(time.Minute * 10)
} }
} }
} }
......
...@@ -27,4 +27,7 @@ const ( ...@@ -27,4 +27,7 @@ const (
BasicMode = 1 BasicMode = 1
HealthMode = 2 HealthMode = 2
SaveMode = 3 SaveMode = 3
OneHour = 1
OneMinutes = 1
TwoMinutes = 2
) )
...@@ -18,10 +18,11 @@ type TaskCmd struct { ...@@ -18,10 +18,11 @@ type TaskCmd struct {
} }
type DockerCmd struct { type DockerCmd struct {
ContainerPort int64 `json:"container_port"` ContainerPort int64 `json:"container_port"`
EnvMap map[string]string RunningBeforeMem int64
HostIp string EnvMap map[string]string
HostPort string HostIp string
HostPort string
} }
type TaskReq struct { type TaskReq struct {
...@@ -147,7 +148,6 @@ type ModelInfo struct { ...@@ -147,7 +148,6 @@ type ModelInfo struct {
EstimatExeTime int32 `json:"estimat_exe_time"` EstimatExeTime int32 `json:"estimat_exe_time"`
StartUpTime int64 `json:"start_up_time"` StartUpTime int64 `json:"start_up_time"`
RunningMem int64 `json:"running_mem"` RunningMem int64 `json:"running_mem"`
OpTime int64
SetupTime int64 SetupTime int64
LastRunTime int64 LastRunTime int64
ImageId string ImageId string
...@@ -155,7 +155,6 @@ type ModelInfo struct { ...@@ -155,7 +155,6 @@ type ModelInfo struct {
IsInstalled bool IsInstalled bool
IsRunning bool IsRunning bool
GpuSeq int32 GpuSeq int32
GpuRam int64
LastWorkTime int64 LastWorkTime int64
TotalRunCount int32 TotalRunCount int32
} }
......
...@@ -14,6 +14,12 @@ import ( ...@@ -14,6 +14,12 @@ import (
"time" "time"
) )
var modelRunningBeoforeMem map[string]int64
func init() {
modelRunningBeoforeMem = make(map[string]int64, 0)
}
type NodeManagerHandler struct { type NodeManagerHandler struct {
nodeManager *models.NodeManagerClient nodeManager *models.NodeManagerClient
worker nodemanagerV2.NodeManagerService_RegisterWorkerClient worker nodemanagerV2.NodeManagerService_RegisterWorkerClient
...@@ -174,7 +180,7 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node ...@@ -174,7 +180,7 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node
gpu := info.GPU gpu := info.GPU
isMatch := false isMatch := false
for _, gpuInfo := range gpu { for _, gpuInfo := range gpu {
if gpuInfo.MemFree > model.GpuRam { if gpuInfo.MemFree > model.RunningMem {
envMap[models.CudaEnv] = strconv.FormatInt(int64(gpuInfo.Seq), 10) envMap[models.CudaEnv] = strconv.FormatInt(int64(gpuInfo.Seq), 10)
isMatch = true isMatch = true
break break
...@@ -186,7 +192,7 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node ...@@ -186,7 +192,7 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node
continue continue
} }
for _, modelInfo := range runningModel { for _, modelInfo := range runningModel {
if modelInfo.RunningMem > model.GpuRam { if modelInfo.RunningMem > model.RunningMem {
isMatch = true isMatch = true
dockerOp.StopContainer(model.ContainerId) dockerOp.StopContainer(model.ContainerId)
envMap[models.CudaEnv] = strconv.FormatInt(int64(modelInfo.GpuSeq), 10) envMap[models.CudaEnv] = strconv.FormatInt(int64(modelInfo.GpuSeq), 10)
...@@ -195,6 +201,9 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node ...@@ -195,6 +201,9 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node
} }
} }
if isMatch { if isMatch {
modelRunningBeoforeMem[model.ImageName] = dockerCmd.RunningBeforeMem
gpuSeq, _ := strconv.ParseInt(dockerCmd.EnvMap[models.CudaEnv], 10, 32)
model.GpuSeq = int32(gpuSeq)
_, err := dockerOp.CreateAndStartContainer(model.ImageName, dockerCmd) _, err := dockerOp.CreateAndStartContainer(model.ImageName, dockerCmd)
if err != nil { if err != nil {
log.WithError(err).Error("Error creating container") log.WithError(err).Error("Error creating container")
...@@ -205,10 +214,15 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node ...@@ -205,10 +214,15 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node
case nodemanagerV2.ModelOperateType_STOP: case nodemanagerV2.ModelOperateType_STOP:
{ {
if model.ContainerId != "" { if model.ContainerId != "" {
model.ContainerId = ""
dockerOp.StopContainer(model.ContainerId) dockerOp.StopContainer(model.ContainerId)
} }
} }
} }
err = db.PutModel(model.ImageName, model)
if err != nil {
log.WithError(err).Error("Db put model failed")
}
} }
}(modelOpMsg, n.taskMsgWorker.DockerOp) }(modelOpMsg, n.taskMsgWorker.DockerOp)
continue continue
...@@ -240,7 +254,6 @@ func (n *NodeManagerHandler) MonitorImageOp(op *nodemanagerV2.ModelOperate) { ...@@ -240,7 +254,6 @@ func (n *NodeManagerHandler) MonitorImageOp(op *nodemanagerV2.ModelOperate) {
log.WithError(err).Error("Op model - get model error") log.WithError(err).Error("Op model - get model error")
return return
} }
model.OpTime = time.Now().Unix()
ticker := time.NewTicker(time.Second * 2) ticker := time.NewTicker(time.Second * 2)
defer ticker.Stop() defer ticker.Stop()
isOp := false isOp := false
...@@ -251,8 +264,8 @@ func (n *NodeManagerHandler) MonitorImageOp(op *nodemanagerV2.ModelOperate) { ...@@ -251,8 +264,8 @@ func (n *NodeManagerHandler) MonitorImageOp(op *nodemanagerV2.ModelOperate) {
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
if time.Since(now).Seconds() > 36000 || isOp { if time.Since(now).Hours() > models.OneHour || isOp {
break return
} }
imagesMap, err := n.taskMsgWorker.DockerOp.PsImageNameMap() imagesMap, err := n.taskMsgWorker.DockerOp.PsImageNameMap()
if err != nil { if err != nil {
...@@ -277,8 +290,8 @@ func (n *NodeManagerHandler) MonitorImageOp(op *nodemanagerV2.ModelOperate) { ...@@ -277,8 +290,8 @@ func (n *NodeManagerHandler) MonitorImageOp(op *nodemanagerV2.ModelOperate) {
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
if time.Since(now).Seconds() > 36000 || isOp { if time.Since(now).Minutes() > models.OneMinutes || isOp {
break return
} }
imagesMap, err := n.taskMsgWorker.DockerOp.PsImageNameMap() imagesMap, err := n.taskMsgWorker.DockerOp.PsImageNameMap()
if err != nil { if err != nil {
...@@ -303,18 +316,33 @@ func (n *NodeManagerHandler) MonitorImageOp(op *nodemanagerV2.ModelOperate) { ...@@ -303,18 +316,33 @@ func (n *NodeManagerHandler) MonitorImageOp(op *nodemanagerV2.ModelOperate) {
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
if time.Since(now).Seconds() > 360 || isOp { if time.Since(now).Minutes() > models.TwoMinutes || isOp {
break return
}
info := getHardwareInfo()
memIsChange := false
for _, gpuInfo := range info.GPU {
if gpuInfo.Seq == model.GpuSeq {
if modelRunningBeoforeMem[op.ImageName] <= gpuInfo.MemFree {
break
}
model.RunningMem = modelRunningBeoforeMem[op.ImageName] - gpuInfo.MemFree
memIsChange = true
}
}
if !memIsChange {
continue
} }
listContainers := n.taskMsgWorker.DockerOp.ListContainer() listContainers := n.taskMsgWorker.DockerOp.ListContainer()
if listContainers != nil && len(listContainers) > 0 { if listContainers != nil && len(listContainers) > 0 {
for _, container := range listContainers { for _, container := range listContainers {
if container.Image == op.ImageName { if container.Image == op.ImageName {
isOp = true isOp = true
model.ContainerId = "" model.StartUpTime = int64(time.Since(now).Seconds())
model.ContainerId = container.ID
model.IsRunning = true model.IsRunning = true
model.LastRunTime = time.Now().Unix() model.LastRunTime = time.Now().Unix()
params := utils.BuildParams(strconv.FormatUint(model.TaskId, 10), model.GpuSeq, model.GpuRam, model.LastRunTime, model.LastWorkTime, model.TotalRunCount, model.EstimatExeTime) params := utils.BuildParams(strconv.FormatUint(model.TaskId, 10), model.GpuSeq, model.RunningMem, model.LastRunTime, model.LastWorkTime, model.TotalRunCount, model.EstimatExeTime)
n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, AddModelRunningResp, params) n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, AddModelRunningResp, params)
break break
} }
...@@ -329,8 +357,8 @@ func (n *NodeManagerHandler) MonitorImageOp(op *nodemanagerV2.ModelOperate) { ...@@ -329,8 +357,8 @@ func (n *NodeManagerHandler) MonitorImageOp(op *nodemanagerV2.ModelOperate) {
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
if time.Since(now).Seconds() > 360 || isOp { if time.Since(now).Minutes() > models.OneMinutes || isOp {
break return
} }
listContainers := n.taskMsgWorker.DockerOp.ListContainer() listContainers := n.taskMsgWorker.DockerOp.ListContainer()
if listContainers != nil && len(listContainers) > 0 { if listContainers != nil && len(listContainers) > 0 {
......
...@@ -155,6 +155,15 @@ func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodemanagerV2.PushTaskMessage) ...@@ -155,6 +155,15 @@ func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodemanagerV2.PushTaskMessage)
t.ExecTaskIdIsFinished.Store(taskMsg.TaskId, true) t.ExecTaskIdIsFinished.Store(taskMsg.TaskId, true)
return return
} }
model, err := db.GetModel(taskOp.taskCmd.ImageName)
if err != nil {
log.Errorf("failed to unmarshal task cmd: %s", err.Error())
taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Not found location model info: %s", err.Error())
t.ExecTaskIdIsFinished.Store(taskMsg.TaskId, true)
return
}
model.LastWorkTime = time.Now().Unix()
model.TotalRunCount++
taskOp.taskCmd.ImageName = fmt.Sprintf("%s-%s", taskOp.taskCmd.ImageName, conf.GetConfig().OpSys) taskOp.taskCmd.ImageName = fmt.Sprintf("%s-%s", taskOp.taskCmd.ImageName, conf.GetConfig().OpSys)
log.Info("received task cmd :", taskOp.taskCmd) log.Info("received task cmd :", taskOp.taskCmd)
log.WithField("t.lastExecTaskImageName", t.lastExecTaskImageName).WithField("newTaskImageName", taskOp.taskCmd.ImageName).Info("task image info") log.WithField("t.lastExecTaskImageName", t.lastExecTaskImageName).WithField("newTaskImageName", taskOp.taskCmd.ImageName).Info("task image info")
...@@ -200,7 +209,9 @@ func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodemanagerV2.PushTaskMessage) ...@@ -200,7 +209,9 @@ func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodemanagerV2.PushTaskMessage)
} else if taskMsg.TaskKind == baseV1.TaskKind_StandardTask { } else if taskMsg.TaskKind == baseV1.TaskKind_StandardTask {
t.IsExecStandardTask = false t.IsExecStandardTask = false
} }
model.EstimatExeTime = int32(endAfterTaskTime.Seconds())
t.ExecTaskIdIsFinished.Store(taskMsg.TaskId, true) t.ExecTaskIdIsFinished.Store(taskMsg.TaskId, true)
_ = db.PutModel(taskOp.taskCmd.ImageName, model)
//log.WithField("result", taskExecResult).Info("lru cache storage task result") //log.WithField("result", taskExecResult).Info("lru cache storage task result")
log.Info("----------------------Compute task exec done--------------------------------") log.Info("----------------------Compute task exec done--------------------------------")
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment