Commit 59f14f68 authored by duanjinfei's avatar duanjinfei

update msg resp

parent 94e34c67
...@@ -7,7 +7,7 @@ import ( ...@@ -7,7 +7,7 @@ import (
"example.com/m/nm" "example.com/m/nm"
"example.com/m/operate" "example.com/m/operate"
"example.com/m/utils" "example.com/m/utils"
nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1" nodemanagerV2 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
"io" "io"
) )
...@@ -84,7 +84,7 @@ func (c *NodeController) AddNodeManager() { ...@@ -84,7 +84,7 @@ func (c *NodeController) AddNodeManager() {
c.ResponseInfo(500, "param error", "") c.ResponseInfo(500, "param error", "")
return return
} }
nodeManager := &nodeManagerV1.NodeManagerInfo{ nodeManager := &nodemanagerV2.NodeManagerInfo{
Publickey: req.PublicKey, Publickey: req.PublicKey,
Endpoint: req.EndPoint, Endpoint: req.EndPoint,
} }
...@@ -130,3 +130,7 @@ func (c *NodeController) GetRecvStatus() { ...@@ -130,3 +130,7 @@ func (c *NodeController) GetRecvStatus() {
func (c *NodeController) GetConfigInfo() { func (c *NodeController) GetConfigInfo() {
c.ResponseInfo(200, "get config successful", conf.GetConfig()) c.ResponseInfo(200, "get config successful", conf.GetConfig())
} }
func (c *NodeController) GetBenefit() {
c.ResponseInfo(200, "get benefit address successful", conf.GetConfig().BenefitAddress)
}
...@@ -61,10 +61,25 @@ func (c *StateController) GetGpuUsageInfo() { ...@@ -61,10 +61,25 @@ func (c *StateController) GetGpuUsageInfo() {
func (c *StateController) GetOtherHardwareInfo() { func (c *StateController) GetOtherHardwareInfo() {
info := utils.GetHardwareInfo() info := utils.GetHardwareInfo()
var diskTotal, diskFree int64
for _, disk := range info.Data.Disk {
for _, point := range disk.MountPoints {
if point == "/" {
diskTotal += disk.SizeBytes
diskFree += disk.FreeBytes
}
}
}
diskUsage := int32((1 - diskFree/diskTotal) * 100)
res := &models.OtherHardwareInfoResp{ res := &models.OtherHardwareInfoResp{
CpuTemp: info.Data.Cpus.Usage, NodeID: conf.GetConfig().SignPublicAddress.Hex(),
RamUsage: info.Data.Mem.Total, CpuName: info.Data.Cpus.Model,
DiskUsage: info.Data.Disk[0].Total, CpuUsage: info.Data.Cpus.Usage,
CpuFrequency: info.Data.Cpus.Frequency,
RamSize: info.Data.Mem.Total,
RamUsage: info.Data.Mem.MemUtil,
DiskSize: diskTotal,
DiskUsage: diskUsage,
} }
c.ResponseInfo(200, "get hardware info successful", res) c.ResponseInfo(200, "get hardware info successful", res)
} }
...@@ -16,23 +16,6 @@ func init() { ...@@ -16,23 +16,6 @@ func init() {
if err != nil { if err != nil {
log.Error("Leveldb open file failed: ", err) log.Error("Leveldb open file failed: ", err)
} }
// 遍历数据库,删除所有数据
iter := dbInstance.NewIterator(nil, nil)
for iter.Next() {
key := iter.Key()
// 删除 key 对应的数据
if err := dbInstance.Delete(key, nil); err != nil {
log.Error("Leveldb delete failed: ", err)
}
}
iter.Release()
//defer func(dbInstance *leveldb.DB) {
// err := dbInstance.Close()
// if err != nil {
// log.Error("Leveldb close file failed: ", err)
// }
//}(dbInstance)
} }
func Put(key string, value []byte) error { func Put(key string, value []byte) error {
......
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"example.com/m/models" "example.com/m/models"
"example.com/m/operate" "example.com/m/operate"
"fmt" "fmt"
nodemanagerV2 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
"io" "io"
"net/http" "net/http"
"os" "os"
...@@ -61,11 +62,11 @@ func (m *ModelHandler) MonitorModelInfo() { ...@@ -61,11 +62,11 @@ func (m *ModelHandler) MonitorModelInfo() {
continue continue
} }
modelInfosResp := resp.Data modelInfosResp := resp.Data
imageNameMap, err := m.dockerOp.PsImageNameMap() //imageNameMap, err := m.dockerOp.PsImageNameMap()
if err != nil { //if err != nil {
log.Error("Docker op ps images failed:", err) // log.Error("Docker op ps images failed:", err)
continue // continue
} //}
reportTaskIds := make([]uint64, 0) reportTaskIds := make([]uint64, 0)
for _, modelInfo := range modelInfosResp { for _, modelInfo := range modelInfosResp {
if modelInfo.ImageName == "" { if modelInfo.ImageName == "" {
...@@ -76,19 +77,22 @@ func (m *ModelHandler) MonitorModelInfo() { ...@@ -76,19 +77,22 @@ func (m *ModelHandler) MonitorModelInfo() {
if len(split) != 2 { if len(split) != 2 {
continue continue
} }
if !imageNameMap[modelInfo.ImageName] { {
//if !imageNameMap[modelInfo.ImageName] {
// todo: 判断机器资源是否够用 // todo: 判断机器资源是否够用
isPull := m.isResourceEnough(modelInfo) //isPull := m.isResourceEnough(modelInfo)
// todo: 如果够用 // todo: 如果够用
if isPull && modelInfo.PublishStatus == models.ModelPublishStatusYes { //if isPull && modelInfo.PublishStatus == models.ModelPublishStatusYes {
log.WithField("model image name", modelInfo.ImageName).Info("pulling image") // log.WithField("model image name", modelInfo.ImageName).Info("pulling image")
go m.dockerOp.PullImage(modelInfo) // go m.dockerOp.PullImage(modelInfo)
} //}
} else { //} else {
log.WithField("name", modelInfo.ImageName).Info("The image name is already") //
m.dockerOp.BootUpModelId[modelInfo.ImageName] = modelInfo.TaskId //}
reportTaskIds = append(reportTaskIds, modelInfo.TaskId)
} }
log.WithField("name", modelInfo.ImageName).Info("The image name is already")
m.dockerOp.BootUpModelId[modelInfo.ImageName] = modelInfo.TaskId
reportTaskIds = append(reportTaskIds, modelInfo.TaskId)
m.dockerOp.SignApi[modelInfo.ImageName] = modelInfo.SignUrl m.dockerOp.SignApi[modelInfo.ImageName] = modelInfo.SignUrl
} }
m.dockerOp.ModelsInfo = modelInfosResp m.dockerOp.ModelsInfo = modelInfosResp
...@@ -127,13 +131,12 @@ func (m *ModelHandler) ReadModels() ([]*models.ModelInfo, error) { ...@@ -127,13 +131,12 @@ func (m *ModelHandler) ReadModels() ([]*models.ModelInfo, error) {
} }
return resp.Data, nil return resp.Data, nil
} }
func (m *ModelHandler) GetRpcModelsResp() (*nodemanagerV2.ModelsInfo, error) {
return nil, nil
}
func (m *ModelHandler) isResourceEnough(modelInfo *models.ModelInfo) bool { func (m *ModelHandler) isResourceEnough(modelInfo *models.ModelInfo) bool {
//isDownload := m.checkDiskUsage(modelInfo)
//isDownload = true
//if !isDownload {
// return isDownload
//}
return true return true
} }
......
...@@ -65,13 +65,13 @@ type GpuInfo struct { ...@@ -65,13 +65,13 @@ type GpuInfo struct {
type Gpu struct { type Gpu struct {
Seq int64 `json:"seq"` Seq int64 `json:"seq"`
Uuid string `json:"uuid"` Uuid string `json:"uuid"`
Model string `json:"model "` Model string `json:"model"`
Performance int64 `json:"performance"` Performance int64 `json:"performance"`
PowerRating int64 `json:"power_rating"` PowerRating int64 `json:"power_rating"`
MemTotal int64 `json:"mem_total"` MemTotal int64 `json:"mem_total"`
MemFree int64 `json:"mem_free"` MemFree int64 `json:"mem_free"`
Usage int64 `json:"usage"` Usage int64 `json:"usage"`
Temp int64 `json:"temp "` Temp int64 `json:"temp"`
PowerRt int64 `json:"power_rt"` PowerRt int64 `json:"power_rt"`
} }
...@@ -92,8 +92,9 @@ type CoreCpuInfo struct { ...@@ -92,8 +92,9 @@ type CoreCpuInfo struct {
} }
type Mem struct { type Mem struct {
Total int64 `json:"total"` Total int64 `json:"total"`
Free int64 `json:"free"` Free int64 `json:"free"`
MemUtil int32 `json:"mem_util"`
} }
type Disk struct { type Disk struct {
...@@ -141,6 +142,10 @@ type ModelInfo struct { ...@@ -141,6 +142,10 @@ type ModelInfo struct {
EstimatExeTime int64 `json:"estimat_exe_time"` EstimatExeTime int64 `json:"estimat_exe_time"`
StartUpTime int64 `json:"start_up_time"` StartUpTime int64 `json:"start_up_time"`
RunningMem int64 `json:"running_mem"` RunningMem int64 `json:"running_mem"`
SetupTime int64 `json:"setup_time"`
LastRunTime int64 `json:"last_run_time"`
IsInstalled bool `json:"is_installed"`
IsRunning bool `json:"is_running"`
} }
type HealthyCheck struct { type HealthyCheck struct {
......
...@@ -21,6 +21,7 @@ type RunningState struct { ...@@ -21,6 +21,7 @@ type RunningState struct {
RunningTime int64 `json:"running_time"` RunningTime int64 `json:"running_time"`
CompletedTaskCount int `json:"completed_task_count"` CompletedTaskCount int `json:"completed_task_count"`
NmIpAddr string `json:"nm_ip_addr"` NmIpAddr string `json:"nm_ip_addr"`
NmLocation string `json:"nm_location"`
NmDelayTime int64 `json:"nm_delay_time"` NmDelayTime int64 `json:"nm_delay_time"`
} }
...@@ -29,29 +30,19 @@ type WorkerAccount struct { ...@@ -29,29 +30,19 @@ type WorkerAccount struct {
ChainID int64 `json:"chain_id"` ChainID int64 `json:"chain_id"`
} }
type GpuInfoResp struct {
Seq int `json:"seq"`
Name string `json:"name"`
TotalMem int64 `json:"total_mem"`
UtilMem int64 `json:"util_mem"`
FreeMem int64 `json:"free_mem"`
}
type GpuUsageReq struct { type GpuUsageReq struct {
Seq int64 `json:"seq"` Seq int64 `json:"seq"`
} }
type GpuUsageInfoResp struct {
Seq int `json:"seq"`
Occupy int `json:"occupy"`
Usage int64 `json:"usage"`
Temp int `json:"temp"`
}
type OtherHardwareInfoResp struct { type OtherHardwareInfoResp struct {
CpuTemp int64 `json:"cpu_temp"` NodeID string `json:"node_id"`
RamUsage int64 `json:"ram_usage"` CpuName string `json:"cpu_name"`
DiskUsage int64 `json:"disk_usage"` CpuUsage int32 `json:"cpu_usage"`
CpuFrequency string `json:"cpu_frequency"`
RamSize int64 `json:"ram_size"`
RamUsage int32 `json:"ram_usage"`
DiskSize int64 `json:"disk_size"`
DiskUsage int32 `json:"disk_usage"`
} }
type Resp struct { type Resp struct {
......
...@@ -26,6 +26,7 @@ func init() { ...@@ -26,6 +26,7 @@ func init() {
CompletedTaskCount: 0, CompletedTaskCount: 0,
NmIpAddr: "", NmIpAddr: "",
NmDelayTime: 0, NmDelayTime: 0,
NmLocation: "",
} }
} }
......
...@@ -3,10 +3,11 @@ package nm ...@@ -3,10 +3,11 @@ package nm
import ( import (
"context" "context"
"example.com/m/conf" "example.com/m/conf"
"example.com/m/largeModel"
"example.com/m/log" "example.com/m/log"
"example.com/m/models" "example.com/m/models"
"example.com/m/operate" "example.com/m/operate"
"example.com/m/validator" "example.com/m/utils"
nodemanagerV2 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2" nodemanagerV2 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
"google.golang.org/grpc" "google.golang.org/grpc"
"time" "time"
...@@ -17,13 +18,15 @@ type MonitorNm struct { ...@@ -17,13 +18,15 @@ type MonitorNm struct {
NodeManagerMsgChan chan *nodemanagerV2.ManagerMessage NodeManagerMsgChan chan *nodemanagerV2.ManagerMessage
DockerOp *operate.DockerOp DockerOp *operate.DockerOp
IsInit bool IsInit bool
ModelHandler *largeModel.ModelHandler
} }
func NewMonitorNm(DockerOp *operate.DockerOp) *MonitorNm { func NewMonitorNm(dockerOp *operate.DockerOp, modelHandler *largeModel.ModelHandler) *MonitorNm {
return &MonitorNm{ return &MonitorNm{
NodeManagerClientChan: make(chan *models.NodeManagerClient, 10), NodeManagerClientChan: make(chan *models.NodeManagerClient, 10),
NodeManagerMsgChan: make(chan *nodemanagerV2.ManagerMessage, 1000), NodeManagerMsgChan: make(chan *nodemanagerV2.ManagerMessage, 1000),
DockerOp: DockerOp, DockerOp: dockerOp,
ModelHandler: modelHandler,
IsInit: false, IsInit: false,
} }
} }
...@@ -50,7 +53,8 @@ func (m *MonitorNm) monitorNmClient() { ...@@ -50,7 +53,8 @@ func (m *MonitorNm) monitorNmClient() {
taskMsgWorker.DistributionTaskWorker(4) taskMsgWorker.DistributionTaskWorker(4)
log.Info("Distribution task worker started.......................") log.Info("Distribution task worker started.......................")
msgRespWorker.RegisterMsgResp(nodeManager, worker, RegisterInfoResp, nil) registerRespParam := utils.BuildParams(m.ModelHandler)
msgRespWorker.RegisterMsgResp(nodeManager, worker, RegisterInfoResp, registerRespParam)
log.Info("------------------------Send register message ended------------------------") log.Info("------------------------Send register message ended------------------------")
msgRespWorker.RegisterMsgResp(nodeManager, worker, DeviceInfoResp, nil) msgRespWorker.RegisterMsgResp(nodeManager, worker, DeviceInfoResp, nil)
...@@ -65,29 +69,22 @@ func (m *MonitorNm) monitorNmClient() { ...@@ -65,29 +69,22 @@ func (m *MonitorNm) monitorNmClient() {
nodeManagerHandler := NewNodeManagerHandler(nodeManager, worker, msgRespWorker, taskMsgWorker) nodeManagerHandler := NewNodeManagerHandler(nodeManager, worker, msgRespWorker, taskMsgWorker)
log.Info("Report model info started") log.Info("Report model info started")
go nodeManagerHandler.ReportResourceMap(m.DockerOp)
log.Info("Monitor resource map worker started")
go nodeManagerHandler.MonitorStandardTaskWorker() go nodeManagerHandler.MonitorStandardTaskWorker()
log.Info("Monitor standard task worker started") log.Info("Monitor standard task worker started")
proofWorker := validator.NewProofWorker()
// 证明存储
//go proofWorker.ProofStorage()
//log.Info("Proof storage worker started")
// 证明提交
//go proofWorker.CommitWitness()
//log.Info("Proof commit worker started")
// 处理消息 // 处理消息
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
go nodeManagerHandler.DistributionMsgWorker(m.NodeManagerMsgChan, proofWorker) go nodeManagerHandler.DistributionMsgWorker(m.NodeManagerMsgChan, m.ModelHandler)
} }
log.Info("------------------------Start rev msg worker thread------------------------") log.Info("------------------------Start rev msg worker thread------------------------")
for { for {
if !IsRecvTask {
log.Warn("User set recv task status is false")
msgRespWorker.RegisterMsgResp(nodeManager, worker, RegisterInfoResp, nil)
nodeManager.UpdateStatus(false)
return
}
sub := time.Now().Sub(nodeManager.GetLastHeartTime()).Seconds() sub := time.Now().Sub(nodeManager.GetLastHeartTime()).Seconds()
log.WithField("time(uint seconds)", sub).Info("Handler nm msg thread monitor heartbeat time") log.WithField("time(uint seconds)", sub).Info("Handler nm msg thread monitor heartbeat time")
rev, err := worker.Recv() rev, err := worker.Recv()
......
...@@ -2,11 +2,10 @@ package nm ...@@ -2,11 +2,10 @@ package nm
import ( import (
"example.com/m/conf" "example.com/m/conf"
"example.com/m/largeModel"
"example.com/m/log" "example.com/m/log"
"example.com/m/models" "example.com/m/models"
"example.com/m/operate"
"example.com/m/utils" "example.com/m/utils"
"example.com/m/validator"
"fmt" "fmt"
nodemanagerV2 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2" nodemanagerV2 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
"time" "time"
...@@ -28,7 +27,7 @@ func NewNodeManagerHandler(nodeManager *models.NodeManagerClient, worker nodeman ...@@ -28,7 +27,7 @@ func NewNodeManagerHandler(nodeManager *models.NodeManagerClient, worker nodeman
} }
} }
func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *nodemanagerV2.ManagerMessage, proofWorker *validator.ProofWorker) { func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *nodemanagerV2.ManagerMessage, modelsHandler *largeModel.ModelHandler) {
for { for {
select { select {
case rev := <-nodeManagerMsgChan: case rev := <-nodeManagerMsgChan:
...@@ -112,7 +111,8 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node ...@@ -112,7 +111,8 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node
nodeInfoMsg := rev.GetNodeInfoRequest() nodeInfoMsg := rev.GetNodeInfoRequest()
if nodeInfoMsg != nil { if nodeInfoMsg != nil {
n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, NodeInfoResp, nil) nodeInfoParam := utils.BuildParams(modelsHandler)
n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, NodeInfoResp, nodeInfoParam)
log.Info(nodeInfoMsg) log.Info(nodeInfoMsg)
continue continue
} }
...@@ -126,16 +126,25 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node ...@@ -126,16 +126,25 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node
modelListMsg := rev.GetModelListRequest() modelListMsg := rev.GetModelListRequest()
if modelListMsg != nil { if modelListMsg != nil {
n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, ModelListResp, nil) modelListParam := utils.BuildParams(modelsHandler)
n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, ModelListResp, modelListParam)
log.Info(modelListMsg) log.Info(modelListMsg)
continue continue
} }
modelOpMsg := rev.GetModelOperateRequest() modelOpMsg := rev.GetModelOperateRequest()
if modelOpMsg != nil { if modelOpMsg != nil {
//for _, modelOperate := range modelOpMsg.ModelOperates { modelOpMsg.GetModelOperates()
//} continue
}
deviceInfoMsg := rev.GetDeviceInfoRequest()
if deviceInfoMsg != nil {
n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, DeviceInfoResp, nil)
log.Info(modelListMsg)
continue continue
} }
goodByeMsg := rev.GetGoodbyeMessage() goodByeMsg := rev.GetGoodbyeMessage()
if goodByeMsg != nil { if goodByeMsg != nil {
reason := goodByeMsg.GetReason() reason := goodByeMsg.GetReason()
...@@ -167,29 +176,3 @@ func (n *NodeManagerHandler) MonitorStandardTaskWorker() { ...@@ -167,29 +176,3 @@ func (n *NodeManagerHandler) MonitorStandardTaskWorker() {
} }
} }
} }
func (n *NodeManagerHandler) ReportResourceMap(dockerOp *operate.DockerOp) {
ticker := time.NewTicker(time.Second * 1)
for {
select {
case <-ticker.C:
if len(dockerOp.ReportModelIds) > 0 {
bootUpModelIds := make([]uint64, 0)
containers := dockerOp.ListContainer()
if containers != nil && len(containers) > 0 {
for _, container := range containers {
if container.State == "running" {
taskId := dockerOp.BootUpModelId[container.Image]
if taskId != 0 {
bootUpModelIds = append(bootUpModelIds, taskId)
}
}
}
}
params := utils.BuildParams(dockerOp.ReportModelIds, bootUpModelIds)
n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, SubmitResourceMapRes, params)
ticker = time.NewTicker(time.Minute * 10)
}
}
}
}
...@@ -3,10 +3,10 @@ package nm ...@@ -3,10 +3,10 @@ package nm
import ( import (
"bytes" "bytes"
"example.com/m/conf" "example.com/m/conf"
"example.com/m/largeModel"
"example.com/m/log" "example.com/m/log"
"example.com/m/models" "example.com/m/models"
"example.com/m/utils" "example.com/m/utils"
"github.com/docker/docker/libnetwork/bitmap"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
nodemanagerV2 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2" nodemanagerV2 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
...@@ -78,45 +78,6 @@ func HeartbeatResp(params ...interface{}) *nodemanagerV2.WorkerMessage { ...@@ -78,45 +78,6 @@ func HeartbeatResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
return heartRes return heartRes
} }
func SubmitResourceMapRes(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("Submit resource map response received params: ", params)
existModelIdIndexes := params[0].([]uint64)
existMap := bitmap.New(1000000000)
for i := 0; i < len(existModelIdIndexes); i++ {
modelIdIndex := existModelIdIndexes[i]
err := existMap.Set(modelIdIndex)
if err != nil {
log.WithField("model id index", modelIdIndex).WithField("error", err).Error("Error setting task id index")
return nil
}
}
existImage, err := existMap.MarshalBinary()
if err != nil {
log.Error("bitmap marshal binary failed with error: ", err)
return nil
}
bootUpModelIdIndexes := params[1].([]uint64)
bootUpMap := bitmap.New(1000000000)
for i := 0; i < len(bootUpModelIdIndexes); i++ {
modelIdIndex := bootUpModelIdIndexes[i]
err := bootUpMap.Set(modelIdIndex)
if err != nil {
log.WithField("modelId index", modelIdIndex).WithField("error", err).Error("Error setting task id index")
return nil
}
}
_, err = bootUpMap.MarshalBinary()
if err != nil {
log.Error("bitmap marshal binary failed with error: ", err)
return nil
}
log.WithField("", existImage).Info("Bit map binary byte")
heartRes := &nodemanagerV2.WorkerMessage{}
log.Info("---------------------------------------Send resource map msg ------------------------------------")
return heartRes
}
func RegisterInfoResp(params ...interface{}) *nodemanagerV2.WorkerMessage { func RegisterInfoResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("Register info response received params:", params) log.Info("Register info response received params:", params)
nowTimeStamp := time.Now().Unix() nowTimeStamp := time.Now().Unix()
...@@ -128,7 +89,11 @@ func RegisterInfoResp(params ...interface{}) *nodemanagerV2.WorkerMessage { ...@@ -128,7 +89,11 @@ func RegisterInfoResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.WithField("hash", signHash.String()).Info("register message sign result") log.WithField("hash", signHash.String()).Info("register message sign result")
sign, _ := crypto.Sign(signHash.Bytes(), conf.GetConfig().SignPrivateKey) sign, _ := crypto.Sign(signHash.Bytes(), conf.GetConfig().SignPrivateKey)
log.Info("register message sign:", common.Bytes2Hex(sign)) log.Info("register message sign:", common.Bytes2Hex(sign))
modelsInfo := getModelsInfo() modelsInfo := params[0].(*largeModel.ModelHandler)
readModels, err := modelsInfo.GetRpcModelsResp()
if err != nil {
return nil
}
hardwareInfo := getHardwareInfo() hardwareInfo := getHardwareInfo()
nodeInfoRes := &nodemanagerV2.WorkerMessage{ nodeInfoRes := &nodemanagerV2.WorkerMessage{
Message: &nodemanagerV2.WorkerMessage_RegisteMessage{ Message: &nodemanagerV2.WorkerMessage_RegisteMessage{
...@@ -138,7 +103,7 @@ func RegisterInfoResp(params ...interface{}) *nodemanagerV2.WorkerMessage { ...@@ -138,7 +103,7 @@ func RegisterInfoResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
BenefitAddress: conf.GetConfig().BenefitAddress, BenefitAddress: conf.GetConfig().BenefitAddress,
}, },
Hardware: hardwareInfo, Hardware: hardwareInfo,
Models: modelsInfo, Models: readModels,
Timestamp: nowTimeStamp, Timestamp: nowTimeStamp,
DeviceSignature: sign, DeviceSignature: sign,
}, },
...@@ -151,7 +116,11 @@ func RegisterInfoResp(params ...interface{}) *nodemanagerV2.WorkerMessage { ...@@ -151,7 +116,11 @@ func RegisterInfoResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
func NodeInfoResp(params ...interface{}) *nodemanagerV2.WorkerMessage { func NodeInfoResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("Node info response received params:", params) log.Info("Node info response received params:", params)
hardwareInfo := getHardwareInfo() hardwareInfo := getHardwareInfo()
modelsInfo := getModelsInfo() modelsInfo := params[0].(*largeModel.ModelHandler)
readModels, err := modelsInfo.GetRpcModelsResp()
if err != nil {
return nil
}
nodeInfoRes := &nodemanagerV2.WorkerMessage{ nodeInfoRes := &nodemanagerV2.WorkerMessage{
Message: &nodemanagerV2.WorkerMessage_NodeInfo{ Message: &nodemanagerV2.WorkerMessage_NodeInfo{
NodeInfo: &nodemanagerV2.NodeInfoResponse{ NodeInfo: &nodemanagerV2.NodeInfoResponse{
...@@ -160,7 +129,7 @@ func NodeInfoResp(params ...interface{}) *nodemanagerV2.WorkerMessage { ...@@ -160,7 +129,7 @@ func NodeInfoResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
BenefitAddress: conf.GetConfig().BenefitAddress, BenefitAddress: conf.GetConfig().BenefitAddress,
}, },
Hardware: hardwareInfo, Hardware: hardwareInfo,
Models: modelsInfo, Models: readModels,
}, },
}, },
} }
...@@ -185,17 +154,17 @@ func DeviceInfoResp(params ...interface{}) *nodemanagerV2.WorkerMessage { ...@@ -185,17 +154,17 @@ func DeviceInfoResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
func DeviceUsageResp(params ...interface{}) *nodemanagerV2.WorkerMessage { func DeviceUsageResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("DeviceUsageResp params :", params) log.Info("DeviceUsageResp params :", params)
info := getHardwareInfo() hardwareInfo := getHardwareInfo()
ramUsage := int32((1 - info.RAM.Total/info.RAM.Free) * 100) ramUsage := int32((1 - hardwareInfo.RAM.Total/hardwareInfo.RAM.Free) * 100)
diskUsage := int32((1 - info.DISK.Total/info.DISK.Free) * 100) diskUsage := int32((1 - hardwareInfo.DISK.Total/hardwareInfo.DISK.Free) * 100)
deviceInfoRes := &nodemanagerV2.WorkerMessage{ deviceInfoRes := &nodemanagerV2.WorkerMessage{
Message: &nodemanagerV2.WorkerMessage_DeviceUsage{ Message: &nodemanagerV2.WorkerMessage_DeviceUsage{
DeviceUsage: &nodemanagerV2.DeviceUsageResponse{ DeviceUsage: &nodemanagerV2.DeviceUsageResponse{
Usage: &nodemanagerV2.HardwareUsage{ Usage: &nodemanagerV2.HardwareUsage{
CpuUsage: info.CPU.Usage, CpuUsage: hardwareInfo.CPU.Usage,
RamUsage: ramUsage, RamUsage: ramUsage,
DiskUsage: diskUsage, DiskUsage: diskUsage,
NetBandwidth: info.NET.Bandwidth, NetBandwidth: hardwareInfo.NET.Bandwidth,
}, },
}, },
}, },
...@@ -206,9 +175,9 @@ func DeviceUsageResp(params ...interface{}) *nodemanagerV2.WorkerMessage { ...@@ -206,9 +175,9 @@ func DeviceUsageResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
func GpuUsageResp(params ...interface{}) *nodemanagerV2.WorkerMessage { func GpuUsageResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("DeviceUsageResp params :", params) log.Info("DeviceUsageResp params :", params)
info := getHardwareInfo() hardwareInfo := getHardwareInfo()
gpusUsage := make([]*nodemanagerV2.GPUUsage, 0) gpusUsage := make([]*nodemanagerV2.GPUUsage, 0)
for _, gpuInfo := range info.GPU { for _, gpuInfo := range hardwareInfo.GPU {
usage := &nodemanagerV2.GPUUsage{ usage := &nodemanagerV2.GPUUsage{
Seq: gpuInfo.Seq, Seq: gpuInfo.Seq,
MemFree: gpuInfo.MemFree, MemFree: gpuInfo.MemFree,
...@@ -231,10 +200,14 @@ func GpuUsageResp(params ...interface{}) *nodemanagerV2.WorkerMessage { ...@@ -231,10 +200,14 @@ func GpuUsageResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
func ModelListResp(params ...interface{}) *nodemanagerV2.WorkerMessage { func ModelListResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("DeviceUsageResp params :", params) log.Info("DeviceUsageResp params :", params)
info := getModelsInfo() modelsInfo := params[0].(*largeModel.ModelHandler)
readModels, err := modelsInfo.GetRpcModelsResp()
if err != nil {
return nil
}
modelListInfoRes := &nodemanagerV2.WorkerMessage{ modelListInfoRes := &nodemanagerV2.WorkerMessage{
Message: &nodemanagerV2.WorkerMessage_ModelsInfo{ Message: &nodemanagerV2.WorkerMessage_ModelsInfo{
ModelsInfo: info, ModelsInfo: readModels,
}, },
} }
log.Info("---------------------------------------Send model list msg ------------------------------------") log.Info("---------------------------------------Send model list msg ------------------------------------")
...@@ -319,16 +292,6 @@ func GoodbyeResp(params ...interface{}) *nodemanagerV2.WorkerMessage { ...@@ -319,16 +292,6 @@ func GoodbyeResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
return goodbyeMsgRes return goodbyeMsgRes
} }
func getModelsInfo() *nodemanagerV2.ModelsInfo {
installModels := make([]*nodemanagerV2.InstalledModel, 0)
runningModels := make([]*nodemanagerV2.RunningModel, 0)
res := &nodemanagerV2.ModelsInfo{
InstalledModels: installModels,
RunningModels: runningModels,
}
return res
}
func getHardwareInfo() *nodemanagerV2.HardwareInfo { func getHardwareInfo() *nodemanagerV2.HardwareInfo {
hardwareInfo := utils.GetHardwareInfo() hardwareInfo := utils.GetHardwareInfo()
gpusInfo := make([]*nodemanagerV2.GPUInfo, 0) gpusInfo := make([]*nodemanagerV2.GPUInfo, 0)
...@@ -341,6 +304,8 @@ func getHardwareInfo() *nodemanagerV2.HardwareInfo { ...@@ -341,6 +304,8 @@ func getHardwareInfo() *nodemanagerV2.HardwareInfo {
} }
} }
} }
diskTotal = diskTotal * conf.GetConfig().DiskUsage
diskFree = diskFree * conf.GetConfig().DiskUsage
var macAddr string var macAddr string
var bandWidth int32 var bandWidth int32
for _, net := range hardwareInfo.Data.Networks { for _, net := range hardwareInfo.Data.Networks {
......
...@@ -30,7 +30,7 @@ func StartMonitor() { ...@@ -30,7 +30,7 @@ func StartMonitor() {
modelHandler := largeModel.NewModelHandler(dockerOp) modelHandler := largeModel.NewModelHandler(dockerOp)
monitorNm := NewMonitorNm(dockerOp) monitorNm := NewMonitorNm(dockerOp, modelHandler)
go monitorNm.monitorNodeManagerSeed() go monitorNm.monitorNodeManagerSeed()
...@@ -75,14 +75,17 @@ func StartMonitor() { ...@@ -75,14 +75,17 @@ func StartMonitor() {
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
if !IsRecvTask {
log.Warn("Stop receive task.........")
continue
}
log.Info("Monitoring node manager client thread start......") log.Info("Monitoring node manager client thread start......")
for _, client := range usedNodeManagerClient { for _, client := range usedNodeManagerClient {
if !IsRecvTask && !client.Status {
client.Status = false
}
log.WithField("Endpoint", client.Endpoint).WithField("LastHeartTime", client.LastHeartTime).WithField("Is Del", client.IsDel).WithField("Status", client.Status).Info("Monitoring node manager client thread") log.WithField("Endpoint", client.Endpoint).WithField("LastHeartTime", client.LastHeartTime).WithField("Is Del", client.IsDel).WithField("Status", client.Status).Info("Monitoring node manager client thread")
} }
if !IsRecvTask {
log.Warn("Stop receive task.........")
continue
}
for i, managerClient := range usedNodeManagerClient { for i, managerClient := range usedNodeManagerClient {
if managerClient.GetStatus() && !managerClient.IsDel { if managerClient.GetStatus() && !managerClient.IsDel {
sub := time.Now().Sub(managerClient.GetLastHeartTime()).Seconds() sub := time.Now().Sub(managerClient.GetLastHeartTime()).Seconds()
......
...@@ -231,6 +231,10 @@ func (t *TaskWorker) GetAckResp(taskMsg *nodemanagerV2.PushTaskMessage) (isCanEx ...@@ -231,6 +231,10 @@ func (t *TaskWorker) GetAckResp(taskMsg *nodemanagerV2.PushTaskMessage) (isCanEx
if t.foundTaskImage(taskCmd) == "" { if t.foundTaskImage(taskCmd) == "" {
log.WithField("imageName", taskCmd.ImageName).Error("The image is not found") log.WithField("imageName", taskCmd.ImageName).Error("The image is not found")
return return
}
running, _, _ := t.foundImageIsRunning(taskCmd.ImageName)
if !running {
} }
log.Info("found task image finished") log.Info("found task image finished")
isCanExecute = true isCanExecute = true
...@@ -249,13 +253,14 @@ func (t *TaskWorker) foundTaskImage(taskCmd *models.TaskCmd) (imageId string) { ...@@ -249,13 +253,14 @@ func (t *TaskWorker) foundTaskImage(taskCmd *models.TaskCmd) (imageId string) {
imageId = "" imageId = ""
return return
} }
foundImageName := fmt.Sprintf("%s-%s", taskCmd.ImageName, conf.GetConfig().OpSys)
isFound := false isFound := false
for _, image := range images { for _, image := range images {
if isFound { if isFound {
break break
} }
for _, tag := range image.RepoTags { for _, tag := range image.RepoTags {
if tag == taskCmd.ImageName { if tag == foundImageName {
imageId = image.ID imageId = image.ID
isFound = true isFound = true
log.Info("The image found success:", image.ID) log.Info("The image found success:", image.ID)
......
...@@ -53,7 +53,7 @@ func NewDockerOp() *DockerOp { ...@@ -53,7 +53,7 @@ func NewDockerOp() *DockerOp {
Reason: "", Reason: "",
dockerClient: dockerClient, dockerClient: dockerClient,
SignApi: make(map[string]string, 0), SignApi: make(map[string]string, 0),
ModelsInfo: make([]*models.ModelInfo, 1000), ModelsInfo: make([]*models.ModelInfo, 100000),
UsedExternalPort: make(map[int64]bool, 0), UsedExternalPort: make(map[int64]bool, 0),
ReportModelIds: make([]uint64, 0), ReportModelIds: make([]uint64, 0),
BootUpModelId: make(map[string]uint64, 0), BootUpModelId: make(map[string]uint64, 0),
......
...@@ -13,6 +13,7 @@ func init() { ...@@ -13,6 +13,7 @@ func init() {
beego.Router("/api/v1/power/update/recv/status", &controllers.NodeController{}, "post:UpdateRecvStatus") beego.Router("/api/v1/power/update/recv/status", &controllers.NodeController{}, "post:UpdateRecvStatus")
beego.Router("/api/v1/power/get/recv/status", &controllers.NodeController{}, "get:GetRecvStatus") beego.Router("/api/v1/power/get/recv/status", &controllers.NodeController{}, "get:GetRecvStatus")
beego.Router("/api/v1/power/get/conf", &controllers.NodeController{}, "get:GetConfigInfo") beego.Router("/api/v1/power/get/conf", &controllers.NodeController{}, "get:GetConfigInfo")
beego.Router("/api/v1/power/get/current/benefit", &controllers.NodeController{}, "get:GetBenefit")
beego.Router("/api/v1/power/get/running/state", &controllers.StateController{}, "get:GetRunningState") beego.Router("/api/v1/power/get/running/state", &controllers.StateController{}, "get:GetRunningState")
beego.Router("/api/v1/power/get/worker/info", &controllers.StateController{}, "get:GetWorkerInfo") beego.Router("/api/v1/power/get/worker/info", &controllers.StateController{}, "get:GetWorkerInfo")
beego.Router("/api/v1/power/list/gpu/info", &controllers.StateController{}, "get:GetListGpuInfo") beego.Router("/api/v1/power/list/gpu/info", &controllers.StateController{}, "get:GetListGpuInfo")
......
...@@ -7,7 +7,7 @@ import ( ...@@ -7,7 +7,7 @@ import (
"example.com/m/operate" "example.com/m/operate"
"fmt" "fmt"
"github.com/golang/groupcache/lru" "github.com/golang/groupcache/lru"
nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1" nodeManagerV2 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
"net/http" "net/http"
"sync" "sync"
"testing" "testing"
...@@ -49,7 +49,7 @@ func TestTaskHandler_computeTaskHandler(t1 *testing.T) { ...@@ -49,7 +49,7 @@ func TestTaskHandler_computeTaskHandler(t1 *testing.T) {
lruCache *lru.Cache lruCache *lru.Cache
DockerOp *operate.DockerOp DockerOp *operate.DockerOp
CmdOp *operate.Command CmdOp *operate.Command
TaskMsg chan *nodeManagerV1.PushTaskMessage TaskMsg chan *nodeManagerV2.PushTaskMessage
TaskRespHeader map[string][]byte TaskRespHeader map[string][]byte
TaskRespBody map[string][]byte TaskRespBody map[string][]byte
TaskIsSuccess map[string]bool TaskIsSuccess map[string]bool
...@@ -57,7 +57,7 @@ func TestTaskHandler_computeTaskHandler(t1 *testing.T) { ...@@ -57,7 +57,7 @@ func TestTaskHandler_computeTaskHandler(t1 *testing.T) {
} }
type args struct { type args struct {
taskMsg *nodeManagerV1.PushTaskMessage taskMsg *nodeManagerV2.PushTaskMessage
} }
m := &models.TaskCmd{ m := &models.TaskCmd{
ImageName: "llm-server:latest", ImageName: "llm-server:latest",
...@@ -82,7 +82,7 @@ func TestTaskHandler_computeTaskHandler(t1 *testing.T) { ...@@ -82,7 +82,7 @@ func TestTaskHandler_computeTaskHandler(t1 *testing.T) {
return return
} }
n := args{ n := args{
taskMsg: &nodeManagerV1.PushTaskMessage{ taskMsg: &nodeManagerV2.PushTaskMessage{
Workload: 111, Workload: 111,
TaskCmd: string(marshal), TaskCmd: string(marshal),
TaskParam: taskParamBytes, TaskParam: taskParamBytes,
...@@ -100,7 +100,7 @@ func TestTaskHandler_computeTaskHandler(t1 *testing.T) { ...@@ -100,7 +100,7 @@ func TestTaskHandler_computeTaskHandler(t1 *testing.T) {
wg: &sync.WaitGroup{}, wg: &sync.WaitGroup{},
lruCache: lru.New(100), lruCache: lru.New(100),
DockerOp: operate.NewDockerOp(), DockerOp: operate.NewDockerOp(),
TaskMsg: make(chan *nodeManagerV1.PushTaskMessage, 0), TaskMsg: make(chan *nodeManagerV2.PushTaskMessage, 0),
TaskRespHeader: make(map[string][]byte, 0), TaskRespHeader: make(map[string][]byte, 0),
TaskRespBody: make(map[string][]byte, 0), TaskRespBody: make(map[string][]byte, 0),
TaskIsSuccess: make(map[string]bool, 0), TaskIsSuccess: make(map[string]bool, 0),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment