Commit 9f9c2f73 authored by duanjinfei's avatar duanjinfei

add report model info

parent 3c7a1676
......@@ -67,7 +67,9 @@ func (m *MonitorNm) monitorNmClient() {
nodeManagerHandler := NewNodeManagerHandler(nodeManager, worker, msgRespWorker, taskMsgWorker)
log.Info("Report model info started")
go m.monitorModel(msgRespWorker, nodeManager, worker)
go m.monitorInstallModel(msgRespWorker, nodeManager, worker)
go m.monitorRunningModel(msgRespWorker, nodeManager, worker)
go nodeManagerHandler.MonitorStandardTaskWorker()
log.Info("Monitor standard task worker started")
......@@ -140,7 +142,7 @@ func (m *MonitorNm) monitorNodeManagerSeed() {
}
}
func (m *MonitorNm) monitorModel(msgRespWorker *RespMsgWorker, nodeManager *models.NodeManagerClient, worker nodemanagerV2.NodeManagerService_RegisterWorkerClient) {
func (m *MonitorNm) monitorInstallModel(msgRespWorker *RespMsgWorker, nodeManager *models.NodeManagerClient, worker nodemanagerV2.NodeManagerService_RegisterWorkerClient) {
reportModel := make(map[string]bool, 0)
images, err := m.DockerOp.PsImageNameMap()
if err != nil {
......@@ -187,3 +189,60 @@ func (m *MonitorNm) monitorModel(msgRespWorker *RespMsgWorker, nodeManager *mode
}
}
}
func (m *MonitorNm) monitorRunningModel(msgRespWorker *RespMsgWorker, nodeManager *models.NodeManagerClient, worker nodemanagerV2.NodeManagerService_RegisterWorkerClient) {
reportModel := make(map[string]bool, 0)
containerList := m.DockerOp.ListContainer()
if containerList == nil || len(containerList) == 0 {
log.Error("Get container failed")
return
}
allModels, err := db.GetAllModels()
if err != nil {
log.WithError(err).Error("Get all models failed")
return
}
addRunningModels := make([]interface{}, 0)
for _, model := range allModels {
isExist := false
for _, container := range containerList {
if model.ImageName == container.Image {
isExist = true
}
}
if reportModel[model.ImageName] || !isExist {
continue
}
reportModel[model.ImageName] = true
addRunningModels = append(addRunningModels, &nodemanagerV2.RunningModel{ModelId: strconv.FormatUint(model.TaskId, 10), GpuSeq: model.GpuSeq, GpuRam: model.RunningMem, StartedTime: model.LastRunTime, LastWorkTime: model.LastWorkTime, TotalRunCount: model.TotalRunCount, ExecTime: model.EstimatExeTime})
}
params := utils.BuildParams(addRunningModels...)
msgRespWorker.RegisterMsgResp(nodeManager, worker, AddModelRunningResp, params)
ticker := time.NewTicker(time.Second * 1)
defer ticker.Stop()
for {
select {
case <-ticker.C:
{
addRunningModels := make([]interface{}, 0)
for _, model := range allModels {
isExist := false
for _, container := range containerList {
if model.ImageName == container.Image {
isExist = true
}
}
if reportModel[model.ImageName] || !isExist {
continue
}
reportModel[model.ImageName] = true
addRunningModels = append(addRunningModels, &nodemanagerV2.RunningModel{ModelId: strconv.FormatUint(model.TaskId, 10), GpuSeq: model.GpuSeq, GpuRam: model.RunningMem, StartedTime: model.LastRunTime, LastWorkTime: model.LastWorkTime, TotalRunCount: model.TotalRunCount, ExecTime: model.EstimatExeTime})
}
params := utils.BuildParams(addRunningModels...)
msgRespWorker.RegisterMsgResp(nodeManager, worker, AddModelRunningResp, params)
ticker = time.NewTicker(time.Minute * 10)
}
}
}
}
......@@ -342,7 +342,7 @@ func (n *NodeManagerHandler) MonitorImageOp(op *nodemanagerV2.ModelOperate) {
model.ContainerId = container.ID
model.IsRunning = true
model.LastRunTime = time.Now().Unix()
params := utils.BuildParams(strconv.FormatUint(model.TaskId, 10), model.GpuSeq, model.RunningMem, model.LastRunTime, model.LastWorkTime, model.TotalRunCount, model.EstimatExeTime)
params := utils.BuildParams(&nodemanagerV2.RunningModel{ModelId: strconv.FormatUint(model.TaskId, 10), GpuSeq: model.GpuSeq, GpuRam: model.RunningMem, StartedTime: model.LastRunTime, LastWorkTime: model.LastWorkTime, TotalRunCount: model.TotalRunCount, ExecTime: model.EstimatExeTime})
n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, AddModelRunningResp, params)
break
}
......
......@@ -327,16 +327,10 @@ func DelModelInstalledResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
func AddModelRunningResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("Add model running response received params:", params)
runningModels := make([]*nodemanagerV2.RunningModel, 0)
model := &nodemanagerV2.RunningModel{
ModelId: params[0].(string),
GpuSeq: params[1].(int32),
GpuRam: params[2].(int64),
StartedTime: params[3].(int64),
LastWorkTime: params[4].(int64),
TotalRunCount: params[5].(int32),
ExecTime: params[6].(int32),
for _, param := range params {
runningModel := param.(*nodemanagerV2.RunningModel)
runningModels = append(runningModels, runningModel)
}
runningModels = append(runningModels, model)
addModelRunningRes := &nodemanagerV2.WorkerMessage{
Message: &nodemanagerV2.WorkerMessage_AddModelRunning{
AddModelRunning: &nodemanagerV2.AddModelRunning{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment