Commit e7ea8956 authored by duanjinfei's avatar duanjinfei

update msg resp

parent 6f6a360d
...@@ -70,12 +70,12 @@ func (c *NodeController) SetBenefitAddress() { ...@@ -70,12 +70,12 @@ func (c *NodeController) SetBenefitAddress() {
func (c *NodeController) ListHistoryBenefitAddress() { func (c *NodeController) ListHistoryBenefitAddress() {
fileBenefitAcc, _ := utils.ReadBenefitFile() fileBenefitAcc, _ := utils.ReadBenefitFile()
res := make([]*models.BenefitAddressStruct, 0) res := make([]string, 0)
for _, addressStruct := range fileBenefitAcc { for _, addressStruct := range fileBenefitAcc {
if addressStruct.IsDel { if addressStruct.IsDel {
continue continue
} }
res = append(res, addressStruct) res = append(res, addressStruct.Address)
} }
c.ResponseInfo(200, "list history benefit address successful", res) c.ResponseInfo(200, "list history benefit address successful", res)
} }
......
...@@ -29,7 +29,7 @@ func NewModelHandler(dockerOp *operate.DockerOp) *ModelHandler { ...@@ -29,7 +29,7 @@ func NewModelHandler(dockerOp *operate.DockerOp) *ModelHandler {
} }
func (m *ModelHandler) MonitorModelInfo() { func (m *ModelHandler) MonitorModelInfo() {
ticker := time.NewTicker(time.Second * 1) ticker := time.NewTicker(time.Millisecond)
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
...@@ -85,6 +85,10 @@ func (m *ModelHandler) MonitorModelInfo() { ...@@ -85,6 +85,10 @@ func (m *ModelHandler) MonitorModelInfo() {
continue continue
} }
} }
if modelInfo.PublishStatus == models.ModelPublishStatusYes {
log.WithField("model image name", modelInfo.ImageName).Info("pulling image")
go m.dockerOp.PullImage(model.ImageName)
}
} }
ticker = time.NewTicker(time.Minute * 10) ticker = time.NewTicker(time.Minute * 10)
} }
...@@ -128,7 +132,7 @@ func (m *ModelHandler) GetRpcModelsResp() (*nodemanagerV2.ModelsInfo, error) { ...@@ -128,7 +132,7 @@ func (m *ModelHandler) GetRpcModelsResp() (*nodemanagerV2.ModelsInfo, error) {
} }
func (m *ModelHandler) MonitorModelStatus() { func (m *ModelHandler) MonitorModelStatus() {
ticker := time.NewTicker(time.Second * 5) ticker := time.NewTicker(time.Millisecond)
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
...@@ -151,26 +155,26 @@ func (m *ModelHandler) MonitorModelStatus() { ...@@ -151,26 +155,26 @@ func (m *ModelHandler) MonitorModelStatus() {
} }
} }
} }
containerList := m.dockerOp.ListContainer() //containerList := m.dockerOp.ListContainer()
if containerList != nil && len(containerList) > 0 { //if containerList != nil && len(containerList) > 0 {
for _, container := range containerList { // for _, container := range containerList {
key := container.Image // key := container.Image
model, err := db.GetModel(key) // model, err := db.GetModel(key)
if err != nil || model == nil { // if err != nil || model == nil {
continue // continue
} // }
if container.State == "running" && !model.IsRunning { // if container.State == "running" && !model.IsRunning {
model.ContainerId = container.ID // model.ContainerId = container.ID
model.LastRunTime = time.Now().Unix() // model.LastRunTime = time.Now().Unix()
model.IsRunning = true // model.IsRunning = true
err = db.PutModel(key, model) // err = db.PutModel(key, model)
if err != nil { // if err != nil {
continue // continue
} // }
} // }
//
} // }
} //}
} }
} }
} }
......
...@@ -3,6 +3,7 @@ package nm ...@@ -3,6 +3,7 @@ package nm
import ( import (
"context" "context"
"example.com/m/conf" "example.com/m/conf"
"example.com/m/db"
"example.com/m/largeModel" "example.com/m/largeModel"
"example.com/m/log" "example.com/m/log"
"example.com/m/models" "example.com/m/models"
...@@ -10,6 +11,7 @@ import ( ...@@ -10,6 +11,7 @@ import (
"example.com/m/utils" "example.com/m/utils"
nodemanagerV2 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2" nodemanagerV2 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
"google.golang.org/grpc" "google.golang.org/grpc"
"strconv"
"time" "time"
) )
...@@ -65,6 +67,8 @@ func (m *MonitorNm) monitorNmClient() { ...@@ -65,6 +67,8 @@ func (m *MonitorNm) monitorNmClient() {
nodeManagerHandler := NewNodeManagerHandler(nodeManager, worker, msgRespWorker, taskMsgWorker) nodeManagerHandler := NewNodeManagerHandler(nodeManager, worker, msgRespWorker, taskMsgWorker)
log.Info("Report model info started") log.Info("Report model info started")
go m.monitorModel(msgRespWorker, nodeManager, worker)
go nodeManagerHandler.MonitorStandardTaskWorker() go nodeManagerHandler.MonitorStandardTaskWorker()
log.Info("Monitor standard task worker started") log.Info("Monitor standard task worker started")
...@@ -135,3 +139,51 @@ func (m *MonitorNm) monitorNodeManagerSeed() { ...@@ -135,3 +139,51 @@ func (m *MonitorNm) monitorNodeManagerSeed() {
} }
} }
} }
func (m *MonitorNm) monitorModel(msgRespWorker *RespMsgWorker, nodeManager *models.NodeManagerClient, worker nodemanagerV2.NodeManagerService_RegisterWorkerClient) {
reportModel := make(map[string]bool, 0)
images, err := m.DockerOp.PsImageNameMap()
if err != nil {
log.WithError(err).Error("Get image name map failed")
return
}
allModels, err := db.GetAllModels()
if err != nil {
log.WithError(err).Error("Get all models failed")
return
}
addInstallModels := make([]interface{}, 0)
for _, model := range allModels {
isExist := images[model.ImageName]
if reportModel[model.ImageName] || !isExist {
continue
}
reportModel[model.ImageName] = true
diskSize, _ := strconv.ParseInt(model.HardwareRequire.DiskSize, 10, 64)
addInstallModels = append(addInstallModels, &nodemanagerV2.InstalledModel{ModelId: strconv.FormatUint(model.TaskId, 10), DiskSize: diskSize, InstalledTime: model.SetupTime, LastRunTime: model.LastRunTime})
}
params := utils.BuildParams(addInstallModels...)
msgRespWorker.RegisterMsgResp(nodeManager, worker, AddModelInstalledResp, params)
ticker := time.NewTicker(time.Second * 1)
defer ticker.Stop()
for {
select {
case <-ticker.C:
{
addInstallModels := make([]interface{}, 0)
for _, model := range allModels {
if reportModel[model.ImageName] || !model.IsInstalled {
continue
}
reportModel[model.ImageName] = true
diskSize, _ := strconv.ParseInt(model.HardwareRequire.DiskSize, 10, 64)
addInstallModels = append(addInstallModels, &nodemanagerV2.InstalledModel{ModelId: strconv.FormatUint(model.TaskId, 10), DiskSize: diskSize, InstalledTime: model.SetupTime, LastRunTime: model.LastRunTime})
}
params := utils.BuildParams(addInstallModels...)
msgRespWorker.RegisterMsgResp(nodeManager, worker, AddModelInstalledResp, params)
ticker = time.NewTicker(time.Minute * 10)
}
}
}
}
...@@ -264,7 +264,7 @@ func (n *NodeManagerHandler) MonitorImageOp(op *nodemanagerV2.ModelOperate) { ...@@ -264,7 +264,7 @@ func (n *NodeManagerHandler) MonitorImageOp(op *nodemanagerV2.ModelOperate) {
model.IsInstalled = true model.IsInstalled = true
model.SetupTime = time.Now().Unix() model.SetupTime = time.Now().Unix()
diskSize, _ := strconv.ParseInt(model.HardwareRequire.DiskSize, 10, 64) diskSize, _ := strconv.ParseInt(model.HardwareRequire.DiskSize, 10, 64)
params := utils.BuildParams(strconv.FormatUint(model.TaskId, 10), diskSize, model.SetupTime, model.LastRunTime) params := utils.BuildParams(&nodemanagerV2.InstalledModel{ModelId: strconv.FormatUint(model.TaskId, 10), DiskSize: diskSize, InstalledTime: model.SetupTime, LastRunTime: model.LastRunTime})
n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, AddModelInstalledResp, params) n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, AddModelInstalledResp, params)
return return
} }
......
...@@ -296,13 +296,10 @@ func GoodbyeResp(params ...interface{}) *nodemanagerV2.WorkerMessage { ...@@ -296,13 +296,10 @@ func GoodbyeResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
func AddModelInstalledResp(params ...interface{}) *nodemanagerV2.WorkerMessage { func AddModelInstalledResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("Add model installed info response received params:", params) log.Info("Add model installed info response received params:", params)
installedModels := make([]*nodemanagerV2.InstalledModel, 0) installedModels := make([]*nodemanagerV2.InstalledModel, 0)
model := &nodemanagerV2.InstalledModel{ for _, param := range params {
ModelId: params[0].(string), model := param.(*nodemanagerV2.InstalledModel)
DiskSize: params[1].(int64), installedModels = append(installedModels, model)
InstalledTime: params[2].(int64),
LastRunTime: params[3].(int64),
} }
installedModels = append(installedModels, model)
deviceInfoRes := &nodemanagerV2.WorkerMessage{ deviceInfoRes := &nodemanagerV2.WorkerMessage{
Message: &nodemanagerV2.WorkerMessage_AddModelInstalled{ Message: &nodemanagerV2.WorkerMessage_AddModelInstalled{
AddModelInstalled: &nodemanagerV2.AddModelInstalled{ AddModelInstalled: &nodemanagerV2.AddModelInstalled{
...@@ -337,7 +334,7 @@ func AddModelRunningResp(params ...interface{}) *nodemanagerV2.WorkerMessage { ...@@ -337,7 +334,7 @@ func AddModelRunningResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
StartedTime: params[3].(int64), StartedTime: params[3].(int64),
LastWorkTime: params[4].(int64), LastWorkTime: params[4].(int64),
TotalRunCount: params[5].(int32), TotalRunCount: params[5].(int32),
WaitTime: params[6].(int32), ExecTime: params[6].(int32),
} }
runningModels = append(runningModels, model) runningModels = append(runningModels, model)
addModelRunningRes := &nodemanagerV2.WorkerMessage{ addModelRunningRes := &nodemanagerV2.WorkerMessage{
......
...@@ -32,12 +32,14 @@ func StartMonitor() { ...@@ -32,12 +32,14 @@ func StartMonitor() {
monitorNm := NewMonitorNm(dockerOp, modelHandler) monitorNm := NewMonitorNm(dockerOp, modelHandler)
go modelHandler.MonitorModelInfo()
go modelHandler.MonitorModelStatus()
go monitorNm.monitorNodeManagerSeed() go monitorNm.monitorNodeManagerSeed()
go monitorNm.monitorNmClient() go monitorNm.monitorNmClient()
go modelHandler.MonitorModelInfo()
for !monitorNm.IsInit { for !monitorNm.IsInit {
time.Sleep(time.Second) time.Sleep(time.Second)
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment