Commit 649a3e9a authored by duanjinfei's avatar duanjinfei

update register msg resp

parent dec684af
...@@ -9,11 +9,11 @@ import ( ...@@ -9,11 +9,11 @@ import (
"sort" "sort"
) )
var dbInstance *leveldb.DB var (
dbInstance *leveldb.DB
var err error err error
modelKeys map[string]bool
var modelKeys map[string]bool )
func init() { func init() {
// 打开或创建一个LevelDB数据库 // 打开或创建一个LevelDB数据库
......
...@@ -19,12 +19,14 @@ import ( ...@@ -19,12 +19,14 @@ import (
type ModelHandler struct { type ModelHandler struct {
dockerOp *operate.DockerOp dockerOp *operate.DockerOp
client *http.Client client *http.Client
IsInit bool
} }
func NewModelHandler(dockerOp *operate.DockerOp) *ModelHandler { func NewModelHandler(dockerOp *operate.DockerOp) *ModelHandler {
return &ModelHandler{ return &ModelHandler{
dockerOp: dockerOp, dockerOp: dockerOp,
client: &http.Client{}, client: &http.Client{},
IsInit: false,
} }
} }
...@@ -97,6 +99,7 @@ func (m *ModelHandler) MonitorModelInfo() { ...@@ -97,6 +99,7 @@ func (m *ModelHandler) MonitorModelInfo() {
go m.dockerOp.PullImage(model.ImageName) go m.dockerOp.PullImage(model.ImageName)
} }
} }
m.IsInit = true
ticker = time.NewTicker(time.Minute * 10) ticker = time.NewTicker(time.Minute * 10)
} }
} }
...@@ -191,3 +194,49 @@ func (m *ModelHandler) MonitorModelStatus() { ...@@ -191,3 +194,49 @@ func (m *ModelHandler) MonitorModelStatus() {
} }
} }
} }
func (m *ModelHandler) ScanModelsResp() (*nodemanagerV2.ModelsInfo, error) {
installedModels := make([]*nodemanagerV2.InstalledModel, 0)
runningModels := make([]*nodemanagerV2.RunningModel, 0)
images, err := m.dockerOp.PsImageNameMap()
if err != nil {
log.WithError(err).Error("get images failed")
return nil, err
}
containerList := m.dockerOp.ListContainer()
if containerList == nil || len(containerList) == 0 {
log.Error("Get container failed")
return nil, fmt.Errorf("get containe failed")
}
allModels, err := db.GetAllModels()
if err != nil {
log.WithError(err).Error("Get all models failed")
return nil, fmt.Errorf("get all models failed")
}
for _, model := range allModels {
isExist := images[model.ImageName]
if !isExist {
continue
}
diskSize, err := strconv.ParseInt(model.HardwareRequire.DiskSize, 10, 64)
if err != nil {
continue
}
installedModels = append(installedModels, &nodemanagerV2.InstalledModel{ModelId: strconv.FormatUint(model.TaskId, 10), DiskSize: diskSize, InstalledTime: model.SetupTime, LastRunTime: model.LastRunTime})
containerIsExist := false
for _, container := range containerList {
if model.ImageName == container.Image {
containerIsExist = true
}
}
if containerIsExist {
runningModels = append(runningModels, &nodemanagerV2.RunningModel{ModelId: strconv.FormatUint(model.TaskId, 10), GpuSeq: model.GpuSeq, GpuRam: model.RunningMem, StartedTime: model.LastRunTime, LastWorkTime: model.LastWorkTime, TotalRunCount: model.TotalRunCount, ExecTime: model.EstimatExeTime})
}
}
res := &nodemanagerV2.ModelsInfo{
InstalledModels: installedModels,
RunningModels: runningModels,
}
return res, nil
}
...@@ -3,7 +3,6 @@ package nm ...@@ -3,7 +3,6 @@ package nm
import ( import (
"context" "context"
"example.com/m/conf" "example.com/m/conf"
"example.com/m/db"
"example.com/m/largeModel" "example.com/m/largeModel"
"example.com/m/log" "example.com/m/log"
"example.com/m/models" "example.com/m/models"
...@@ -11,7 +10,6 @@ import ( ...@@ -11,7 +10,6 @@ import (
"example.com/m/utils" "example.com/m/utils"
nodemanagerV2 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2" nodemanagerV2 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
"google.golang.org/grpc" "google.golang.org/grpc"
"strconv"
"time" "time"
) )
...@@ -67,10 +65,6 @@ func (m *MonitorNm) monitorNmClient() { ...@@ -67,10 +65,6 @@ func (m *MonitorNm) monitorNmClient() {
nodeManagerHandler := NewNodeManagerHandler(nodeManager, worker, msgRespWorker, taskMsgWorker) nodeManagerHandler := NewNodeManagerHandler(nodeManager, worker, msgRespWorker, taskMsgWorker)
log.Info("Report model info started") log.Info("Report model info started")
go m.monitorInstallModel(msgRespWorker, nodeManager, worker)
go m.monitorRunningModel(msgRespWorker, nodeManager, worker)
go nodeManagerHandler.MonitorStandardTaskWorker() go nodeManagerHandler.MonitorStandardTaskWorker()
log.Info("Monitor standard task worker started") log.Info("Monitor standard task worker started")
...@@ -141,108 +135,3 @@ func (m *MonitorNm) monitorNodeManagerSeed() { ...@@ -141,108 +135,3 @@ func (m *MonitorNm) monitorNodeManagerSeed() {
} }
} }
} }
func (m *MonitorNm) monitorInstallModel(msgRespWorker *RespMsgWorker, nodeManager *models.NodeManagerClient, worker nodemanagerV2.NodeManagerService_RegisterWorkerClient) {
reportModel := make(map[string]bool, 0)
images, err := m.DockerOp.PsImageNameMap()
if err != nil {
log.WithError(err).Error("Get image name map failed")
return
}
allModels, err := db.GetAllModels()
if err != nil {
log.WithError(err).Error("Get all models failed")
return
}
addInstallModels := make([]interface{}, 0)
for _, model := range allModels {
isExist := images[model.ImageName]
if reportModel[model.ImageName] || !isExist {
continue
}
reportModel[model.ImageName] = true
diskSize, _ := strconv.ParseInt(model.HardwareRequire.DiskSize, 10, 64)
addInstallModels = append(addInstallModels, &nodemanagerV2.InstalledModel{ModelId: strconv.FormatUint(model.TaskId, 10), DiskSize: diskSize, InstalledTime: model.SetupTime, LastRunTime: model.LastRunTime})
}
params := utils.BuildParams(addInstallModels...)
msgRespWorker.RegisterMsgResp(nodeManager, worker, AddModelInstalledResp, params)
ticker := time.NewTicker(time.Second * 1)
defer ticker.Stop()
for {
select {
case <-ticker.C:
{
addInstallModels := make([]interface{}, 0)
for _, model := range allModels {
if reportModel[model.ImageName] || !model.IsInstalled {
continue
}
reportModel[model.ImageName] = true
diskSize, _ := strconv.ParseInt(model.HardwareRequire.DiskSize, 10, 64)
addInstallModels = append(addInstallModels, &nodemanagerV2.InstalledModel{ModelId: strconv.FormatUint(model.TaskId, 10), DiskSize: diskSize, InstalledTime: model.SetupTime, LastRunTime: model.LastRunTime})
}
params := utils.BuildParams(addInstallModels...)
msgRespWorker.RegisterMsgResp(nodeManager, worker, AddModelInstalledResp, params)
ticker = time.NewTicker(time.Minute * 10)
}
}
}
}
func (m *MonitorNm) monitorRunningModel(msgRespWorker *RespMsgWorker, nodeManager *models.NodeManagerClient, worker nodemanagerV2.NodeManagerService_RegisterWorkerClient) {
reportModel := make(map[string]bool, 0)
containerList := m.DockerOp.ListContainer()
if containerList == nil || len(containerList) == 0 {
log.Error("Get container failed")
return
}
allModels, err := db.GetAllModels()
if err != nil {
log.WithError(err).Error("Get all models failed")
return
}
addRunningModels := make([]interface{}, 0)
for _, model := range allModels {
isExist := false
for _, container := range containerList {
if model.ImageName == container.Image {
isExist = true
}
}
if reportModel[model.ImageName] || !isExist {
continue
}
reportModel[model.ImageName] = true
addRunningModels = append(addRunningModels, &nodemanagerV2.RunningModel{ModelId: strconv.FormatUint(model.TaskId, 10), GpuSeq: model.GpuSeq, GpuRam: model.RunningMem, StartedTime: model.LastRunTime, LastWorkTime: model.LastWorkTime, TotalRunCount: model.TotalRunCount, ExecTime: model.EstimatExeTime})
}
params := utils.BuildParams(addRunningModels...)
msgRespWorker.RegisterMsgResp(nodeManager, worker, AddModelRunningResp, params)
ticker := time.NewTicker(time.Second * 1)
defer ticker.Stop()
for {
select {
case <-ticker.C:
{
addRunningModels := make([]interface{}, 0)
for _, model := range allModels {
isExist := false
for _, container := range containerList {
if model.ImageName == container.Image {
isExist = true
}
}
if reportModel[model.ImageName] || !isExist {
continue
}
reportModel[model.ImageName] = true
addRunningModels = append(addRunningModels, &nodemanagerV2.RunningModel{ModelId: strconv.FormatUint(model.TaskId, 10), GpuSeq: model.GpuSeq, GpuRam: model.RunningMem, StartedTime: model.LastRunTime, LastWorkTime: model.LastWorkTime, TotalRunCount: model.TotalRunCount, ExecTime: model.EstimatExeTime})
}
params := utils.BuildParams(addRunningModels...)
msgRespWorker.RegisterMsgResp(nodeManager, worker, AddModelRunningResp, params)
ticker = time.NewTicker(time.Minute * 10)
}
}
}
}
...@@ -90,7 +90,7 @@ func RegisterInfoResp(params ...interface{}) *nodemanagerV2.WorkerMessage { ...@@ -90,7 +90,7 @@ func RegisterInfoResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
sign, _ := crypto.Sign(signHash.Bytes(), conf.GetConfig().SignPrivateKey) sign, _ := crypto.Sign(signHash.Bytes(), conf.GetConfig().SignPrivateKey)
log.Info("register message sign:", common.Bytes2Hex(sign)) log.Info("register message sign:", common.Bytes2Hex(sign))
modelsInfo := params[0].(*largeModel.ModelHandler) modelsInfo := params[0].(*largeModel.ModelHandler)
readModels, err := modelsInfo.GetRpcModelsResp() readModels, err := modelsInfo.ScanModelsResp()
if err != nil { if err != nil {
return nil return nil
} }
......
...@@ -43,13 +43,13 @@ func StartMonitor() { ...@@ -43,13 +43,13 @@ func StartMonitor() {
go monitorNm.monitorNodeManagerSeed() go monitorNm.monitorNodeManagerSeed()
log.WithField("func", "monitorNodeManagerSeed").Info("--------------------Start monitorNm--------------------") log.WithField("func", "monitorNodeManagerSeed").Info("--------------------Start monitorNm--------------------")
go monitorNm.monitorNmClient() for !monitorNm.IsInit && !modelHandler.IsInit {
log.WithField("func", "monitorNmClient").Info("--------------------Start monitorNm--------------------")
for !monitorNm.IsInit {
time.Sleep(time.Second) time.Sleep(time.Second)
} }
go monitorNm.monitorNmClient()
log.WithField("func", "monitorNmClient").Info("--------------------Start monitorNm--------------------")
var connectNodeManagerCount int64 = 0 var connectNodeManagerCount int64 = 0
selectClientRandomNum := utils.GenerateRandomNumber(conf.GetConfig().SignPrivateKey, conf.GetConfig().NodeManagerNum) selectClientRandomNum := utils.GenerateRandomNumber(conf.GetConfig().SignPrivateKey, conf.GetConfig().NodeManagerNum)
isSelect := false isSelect := false
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment