Commit 14cdac1e authored by duanjinfei's avatar duanjinfei

update msg resp

parent 59f14f68
...@@ -21,7 +21,7 @@ var ( ...@@ -21,7 +21,7 @@ var (
func init() { func init() {
RootCmd.PersistentFlags().StringVarP(&rewardAddr, "reward", "r", "0x0Fb196385c8826e3806ebA2cA2cb78B26E08fEEe", "please enter a reward address") RootCmd.PersistentFlags().StringVarP(&rewardAddr, "reward", "r", "0x0Fb196385c8826e3806ebA2cA2cb78B26E08fEEe", "please enter a reward address")
RootCmd.PersistentFlags().StringVarP(&externalIp, "externalIp", "e", "192.168.1.120", "please enter server external ip address") RootCmd.PersistentFlags().StringVarP(&externalIp, "externalIp", "e", "192.168.1.120", "please enter server external ip address")
RootCmd.PersistentFlags().StringVarP(&opSys, "opSys", "s", "", "please enter you op sys name : win、linux、mac") RootCmd.PersistentFlags().StringVarP(&opSys, "opSys", "s", "", "please enter you op sys name : win、linux")
RootCmd.PersistentFlags().BoolVarP(&debug, "debug", "d", false, "set log level debug") RootCmd.PersistentFlags().BoolVarP(&debug, "debug", "d", false, "set log level debug")
cobra.OnInitialize(initConfig) cobra.OnInitialize(initConfig)
} }
......
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"example.com/m/utils" "example.com/m/utils"
nodemanagerV2 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2" nodemanagerV2 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
"io" "io"
"time"
) )
type NodeController struct { type NodeController struct {
...@@ -119,6 +120,10 @@ func (c *NodeController) UpdateRecvStatus() { ...@@ -119,6 +120,10 @@ func (c *NodeController) UpdateRecvStatus() {
c.ResponseInfo(500, "param error", "") c.ResponseInfo(500, "param error", "")
return return
} }
if !nm.IsRecvTask && req.IsRecv {
nm.RunningState.RunningTime = time.Now().Unix()
nm.RunningState.CompletedTaskCount = 0
}
nm.IsRecvTask = req.IsRecv nm.IsRecvTask = req.IsRecv
c.ResponseInfo(200, "update recv status successful", "") c.ResponseInfo(200, "update recv status successful", "")
} }
......
...@@ -18,6 +18,17 @@ func (c *StateController) GetRunningState() { ...@@ -18,6 +18,17 @@ func (c *StateController) GetRunningState() {
c.ResponseInfo(200, "get running state successful", res) c.ResponseInfo(200, "get running state successful", res)
} }
func (c *StateController) GetRunningTp() {
info := utils.GetHardwareInfo()
var totalTemp int64
for _, gpu := range info.Data.Gpus {
totalTemp += gpu.Temp
}
avgTemp := totalTemp / int64(len(info.Data.Gpus))
c.ResponseInfo(200, "get running state successful", avgTemp)
}
func (c *StateController) GetWorkerInfo() { func (c *StateController) GetWorkerInfo() {
res := models.WorkerAccount{ res := models.WorkerAccount{
WorkerAcc: conf.GetConfig().SignPublicAddress.Hex(), WorkerAcc: conf.GetConfig().SignPublicAddress.Hex(),
......
package db package db
import ( import (
"encoding/json"
"example.com/m/log" "example.com/m/log"
"example.com/m/models"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/iterator"
) )
...@@ -18,9 +20,14 @@ func init() { ...@@ -18,9 +20,14 @@ func init() {
} }
} }
func Put(key string, value []byte) error { func Put(key string, value any) error {
valueByte, err := json.Marshal(value)
if err != nil {
log.Error("Leveldb put data failed:", err)
return err
}
// 存储数据 // 存储数据
err := dbInstance.Put([]byte(key), value, nil) err = dbInstance.Put([]byte(key), valueByte, nil)
if err != nil { if err != nil {
log.Error("Leveldb put data failed:", err) log.Error("Leveldb put data failed:", err)
return err return err
...@@ -48,6 +55,22 @@ func Get(key string) ([]byte, error) { ...@@ -48,6 +55,22 @@ func Get(key string) ([]byte, error) {
return data, nil return data, nil
} }
func GetModel(key string) (*models.ModelInfo, error) {
data, err := dbInstance.Get([]byte(key), nil)
if err != nil {
log.Error("Leveldb get data failed:", err)
return nil, err
}
log.WithField("key", key).WithField("value", data).Info("leveldb data")
imageInfo := &models.ModelInfo{}
err = json.Unmarshal(data, imageInfo)
if err != nil {
log.Error("Json decode image data failed:", err)
return nil, err
}
return imageInfo, nil
}
func Delete(key []byte) error { func Delete(key []byte) error {
err := dbInstance.Delete(key, nil) err := dbInstance.Delete(key, nil)
if err != nil { if err != nil {
......
...@@ -3,6 +3,7 @@ package largeModel ...@@ -3,6 +3,7 @@ package largeModel
import ( import (
"encoding/json" "encoding/json"
"example.com/m/conf" "example.com/m/conf"
"example.com/m/db"
"example.com/m/log" "example.com/m/log"
"example.com/m/models" "example.com/m/models"
"example.com/m/operate" "example.com/m/operate"
...@@ -62,12 +63,6 @@ func (m *ModelHandler) MonitorModelInfo() { ...@@ -62,12 +63,6 @@ func (m *ModelHandler) MonitorModelInfo() {
continue continue
} }
modelInfosResp := resp.Data modelInfosResp := resp.Data
//imageNameMap, err := m.dockerOp.PsImageNameMap()
//if err != nil {
// log.Error("Docker op ps images failed:", err)
// continue
//}
reportTaskIds := make([]uint64, 0)
for _, modelInfo := range modelInfosResp { for _, modelInfo := range modelInfosResp {
if modelInfo.ImageName == "" { if modelInfo.ImageName == "" {
continue continue
...@@ -77,38 +72,19 @@ func (m *ModelHandler) MonitorModelInfo() { ...@@ -77,38 +72,19 @@ func (m *ModelHandler) MonitorModelInfo() {
if len(split) != 2 { if len(split) != 2 {
continue continue
} }
{
//if !imageNameMap[modelInfo.ImageName] {
// todo: 判断机器资源是否够用
//isPull := m.isResourceEnough(modelInfo)
// todo: 如果够用
//if isPull && modelInfo.PublishStatus == models.ModelPublishStatusYes {
// log.WithField("model image name", modelInfo.ImageName).Info("pulling image")
// go m.dockerOp.PullImage(modelInfo)
//}
//} else {
//
//}
}
log.WithField("name", modelInfo.ImageName).Info("The image name is already") log.WithField("name", modelInfo.ImageName).Info("The image name is already")
m.dockerOp.BootUpModelId[modelInfo.ImageName] = modelInfo.TaskId
reportTaskIds = append(reportTaskIds, modelInfo.TaskId)
m.dockerOp.SignApi[modelInfo.ImageName] = modelInfo.SignUrl m.dockerOp.SignApi[modelInfo.ImageName] = modelInfo.SignUrl
} err := db.Put(modelInfo.ImageName, modelInfo)
m.dockerOp.ModelsInfo = modelInfosResp if err != nil {
m.dockerOp.ReportModelIds = reportTaskIds log.WithError(err).Error("Put db error")
err = os.WriteFile(m.modelsFileName, bodyBytes, 0644) continue
if err != nil { }
log.WithError(err).Error("Error writing models.json")
} }
ticker = time.NewTicker(time.Minute * 10) ticker = time.NewTicker(time.Minute * 10)
} }
} }
} }
func (m *ModelHandler) heatDataHandler(modelInfosResp []*models.ModelInfo) {
}
func (m *ModelHandler) ReadModels() ([]*models.ModelInfo, error) { func (m *ModelHandler) ReadModels() ([]*models.ModelInfo, error) {
bodyBytes, err := os.ReadFile(m.modelsFileName) bodyBytes, err := os.ReadFile(m.modelsFileName)
if err != nil { if err != nil {
...@@ -131,15 +107,51 @@ func (m *ModelHandler) ReadModels() ([]*models.ModelInfo, error) { ...@@ -131,15 +107,51 @@ func (m *ModelHandler) ReadModels() ([]*models.ModelInfo, error) {
} }
return resp.Data, nil return resp.Data, nil
} }
func (m *ModelHandler) GetRpcModelsResp() (*nodemanagerV2.ModelsInfo, error) {
return nil, nil func (m *ModelHandler) GetRpcModelsResp() (*nodemanagerV2.ModelsInfo, error) {
installedModels := make([]*nodemanagerV2.InstalledModel, 0)
runningModels := make([]*nodemanagerV2.RunningModel, 0)
readModels, err := m.ReadModels()
if err != nil {
log.WithError(err).Error("Error reading models")
return nil, err
}
for _, model := range readModels {
if model.IsInstalled {
diskSize, err := strconv.ParseInt(model.HardwareRequire.DiskSize, 10, 64)
if err != nil {
return nil, err
}
model := &nodemanagerV2.InstalledModel{
ModelId: strconv.FormatUint(model.TaskId, 10),
DiskSize: diskSize,
InstalledTime: model.SetupTime,
LastRunTime: model.LastRunTime,
}
installedModels = append(installedModels, model)
}
if model.IsRunning {
model := &nodemanagerV2.RunningModel{
ModelId: strconv.FormatUint(model.TaskId, 10),
}
runningModels = append(runningModels, model)
}
}
res := &nodemanagerV2.ModelsInfo{
InstalledModels: installedModels,
RunningModels: runningModels,
}
return res, nil
} }
func (m *ModelHandler) isResourceEnough(modelInfo *models.ModelInfo) bool { func (m *ModelHandler) isResourceEnough(modelInfo *models.ModelInfo) bool {
return true return true
} }
func (m *ModelHandler) checkGpuUsage(modelInfo *models.ModelInfo) bool {
return false
}
func (m *ModelHandler) checkDiskUsage(modelInfo *models.ModelInfo) bool { func (m *ModelHandler) checkDiskUsage(modelInfo *models.ModelInfo) bool {
totalSize, usedSize, availSize, usageSize, err := m.dockerOp.GetDockerInfo() totalSize, usedSize, availSize, usageSize, err := m.dockerOp.GetDockerInfo()
if err != nil { if err != nil {
...@@ -162,8 +174,3 @@ func (m *ModelHandler) checkDiskUsage(modelInfo *models.ModelInfo) bool { ...@@ -162,8 +174,3 @@ func (m *ModelHandler) checkDiskUsage(modelInfo *models.ModelInfo) bool {
} }
return true return true
} }
func (m *ModelHandler) checkHeat() bool {
return false
}
...@@ -60,10 +60,6 @@ func (m *MonitorNm) monitorNmClient() { ...@@ -60,10 +60,6 @@ func (m *MonitorNm) monitorNmClient() {
msgRespWorker.RegisterMsgResp(nodeManager, worker, DeviceInfoResp, nil) msgRespWorker.RegisterMsgResp(nodeManager, worker, DeviceInfoResp, nil)
log.Info("------------------------Send deviceInfo message ended------------------------") log.Info("------------------------Send deviceInfo message ended------------------------")
if len(m.DockerOp.ReportModelIds) == 0 {
//params := utils.BuildParams(m.DockerOp.ReportModelIds, []uint64{0})
//msgRespWorker.RegisterMsgResp(nodeManager, worker, SubmitResourceMapRes, params)
}
log.Info("------------------------Send once-off message ended------------------------") log.Info("------------------------Send once-off message ended------------------------")
nodeManagerHandler := NewNodeManagerHandler(nodeManager, worker, msgRespWorker, taskMsgWorker) nodeManagerHandler := NewNodeManagerHandler(nodeManager, worker, msgRespWorker, taskMsgWorker)
......
...@@ -119,6 +119,7 @@ func NodeInfoResp(params ...interface{}) *nodemanagerV2.WorkerMessage { ...@@ -119,6 +119,7 @@ func NodeInfoResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
modelsInfo := params[0].(*largeModel.ModelHandler) modelsInfo := params[0].(*largeModel.ModelHandler)
readModels, err := modelsInfo.GetRpcModelsResp() readModels, err := modelsInfo.GetRpcModelsResp()
if err != nil { if err != nil {
log.WithError(err).Error("Error getting rpc models response")
return nil return nil
} }
nodeInfoRes := &nodemanagerV2.WorkerMessage{ nodeInfoRes := &nodemanagerV2.WorkerMessage{
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"example.com/m/conf" "example.com/m/conf"
"example.com/m/db"
"example.com/m/log" "example.com/m/log"
"example.com/m/models" "example.com/m/models"
"example.com/m/operate" "example.com/m/operate"
...@@ -220,7 +221,10 @@ func (t *TaskWorker) GetAckResp(taskMsg *nodemanagerV2.PushTaskMessage) (isCanEx ...@@ -220,7 +221,10 @@ func (t *TaskWorker) GetAckResp(taskMsg *nodemanagerV2.PushTaskMessage) (isCanEx
isSuccess := value.(bool) isSuccess := value.(bool)
log.WithField("isSuccess", isSuccess).Info("Task exec info") log.WithField("isSuccess", isSuccess).Info("Task exec info")
if !isSuccess && !t.lastExecTaskStartTime.IsZero() { if !isSuccess && !t.lastExecTaskStartTime.IsZero() {
lastTaskImageInfo := t.DockerOp.GetImageInfo(t.lastExecTaskImageName) lastTaskImageInfo, err := db.GetModel(t.lastExecTaskImageName)
if err != nil {
return false, 0, 0, 0
}
since := time.Since(t.lastExecTaskStartTime) since := time.Since(t.lastExecTaskStartTime)
queueWaitTime = lastTaskImageInfo.EstimatExeTime - int64(since.Seconds()) queueWaitTime = lastTaskImageInfo.EstimatExeTime - int64(since.Seconds())
if queueWaitTime < 0 { if queueWaitTime < 0 {
...@@ -238,7 +242,10 @@ func (t *TaskWorker) GetAckResp(taskMsg *nodemanagerV2.PushTaskMessage) (isCanEx ...@@ -238,7 +242,10 @@ func (t *TaskWorker) GetAckResp(taskMsg *nodemanagerV2.PushTaskMessage) (isCanEx
} }
log.Info("found task image finished") log.Info("found task image finished")
isCanExecute = true isCanExecute = true
modelInfo := t.DockerOp.GetImageInfo(taskCmd.ImageName) modelInfo, err := db.GetModel(t.lastExecTaskImageName)
if err != nil {
return false, 0, 0, 0
}
if modelInfo != nil { if modelInfo != nil {
bootUpTime = modelInfo.StartUpTime bootUpTime = modelInfo.StartUpTime
executeTime = modelInfo.EstimatExeTime executeTime = modelInfo.EstimatExeTime
...@@ -432,7 +439,7 @@ func (op *TaskOp) getFileCache(respStr string, dockerOp *operate.DockerOp) (stri ...@@ -432,7 +439,7 @@ func (op *TaskOp) getFileCache(respStr string, dockerOp *operate.DockerOp) (stri
log.WithField("isBase64", isBase64).Info("resp str info") log.WithField("isBase64", isBase64).Info("resp str info")
if isBase64 { if isBase64 {
log.WithField("taskId", op.taskMsg.TaskId).WithField("format", respFormat).WithField("suffix", suffix).Info("Parse container resp") log.WithField("taskId", op.taskMsg.TaskId).WithField("format", respFormat).WithField("suffix", suffix).Info("Parse container resp")
queryString := utils.MatchFileCacheQueryString(op.taskParam.Headers, op.taskCmd.ImageName, dockerOp.ModelsInfo, respFormat) queryString := utils.MatchFileCacheQueryString(op.taskParam.Headers, op.taskCmd.ImageName, respFormat)
ossUri, err := op.uploadOSS(op.taskMsg.TaskId, queryString, decodeByte, suffix) ossUri, err := op.uploadOSS(op.taskMsg.TaskId, queryString, decodeByte, suffix)
if err != nil || ossUri == "" { if err != nil || ossUri == "" {
log.WithError(err).Error("upload image into file cache failed") log.WithError(err).Error("upload image into file cache failed")
......
...@@ -30,10 +30,6 @@ type DockerOp struct { ...@@ -30,10 +30,6 @@ type DockerOp struct {
dockerClient *client.Client dockerClient *client.Client
UsedExternalPort map[int64]bool UsedExternalPort map[int64]bool
SignApi map[string]string SignApi map[string]string
ModelsInfo []*models.ModelInfo
ReportModelIds []uint64
BootUpModelId map[string]uint64
//RunningImages map[string]bool
} }
func init() { func init() {
...@@ -53,23 +49,11 @@ func NewDockerOp() *DockerOp { ...@@ -53,23 +49,11 @@ func NewDockerOp() *DockerOp {
Reason: "", Reason: "",
dockerClient: dockerClient, dockerClient: dockerClient,
SignApi: make(map[string]string, 0), SignApi: make(map[string]string, 0),
ModelsInfo: make([]*models.ModelInfo, 100000),
UsedExternalPort: make(map[int64]bool, 0), UsedExternalPort: make(map[int64]bool, 0),
ReportModelIds: make([]uint64, 0),
BootUpModelId: make(map[string]uint64, 0),
//RunningImages: make(map[string]bool, 0), //RunningImages: make(map[string]bool, 0),
} }
} }
func (d *DockerOp) GetImageInfo(imageName string) *models.ModelInfo {
for _, info := range d.ModelsInfo {
if info.ImageName == imageName {
return info
}
}
return nil
}
func (d *DockerOp) GetContainerSign(taskMsg *nodemanagerV2.PushTaskMessage, taskRes []byte) []byte { func (d *DockerOp) GetContainerSign(taskMsg *nodemanagerV2.PushTaskMessage, taskRes []byte) []byte {
reqBody := &models.TaskReq{ reqBody := &models.TaskReq{
TaskId: taskMsg.TaskId, TaskId: taskMsg.TaskId,
......
...@@ -14,9 +14,10 @@ func init() { ...@@ -14,9 +14,10 @@ func init() {
beego.Router("/api/v1/power/get/recv/status", &controllers.NodeController{}, "get:GetRecvStatus") beego.Router("/api/v1/power/get/recv/status", &controllers.NodeController{}, "get:GetRecvStatus")
beego.Router("/api/v1/power/get/conf", &controllers.NodeController{}, "get:GetConfigInfo") beego.Router("/api/v1/power/get/conf", &controllers.NodeController{}, "get:GetConfigInfo")
beego.Router("/api/v1/power/get/current/benefit", &controllers.NodeController{}, "get:GetBenefit") beego.Router("/api/v1/power/get/current/benefit", &controllers.NodeController{}, "get:GetBenefit")
beego.Router("/api/v1/power/get/running/tp", &controllers.StateController{}, "get:GetRunningTp")
beego.Router("/api/v1/power/get/running/state", &controllers.StateController{}, "get:GetRunningState") beego.Router("/api/v1/power/get/running/state", &controllers.StateController{}, "get:GetRunningState")
beego.Router("/api/v1/power/get/worker/info", &controllers.StateController{}, "get:GetWorkerInfo") beego.Router("/api/v1/power/get/worker/info", &controllers.StateController{}, "get:GetWorkerInfo")
beego.Router("/api/v1/power/list/gpu/info", &controllers.StateController{}, "get:GetListGpuInfo") beego.Router("/api/v1/power/list/gpu/info", &controllers.StateController{}, "get:GetListGpuInfo")
beego.Router("/api/v1/power/get/gpu/info", &controllers.StateController{}, "post:GetGpuUsageInfo") beego.Router("/api/v1/power/get/gpu/info", &controllers.StateController{}, "post:GetGpuUsageInfo")
beego.Router("/api/v1/power/get/hardware/info", &controllers.StateController{}, "post:GetOtherHardwareInfo") beego.Router("/api/v1/power/get/hardware/info", &controllers.StateController{}, "get:GetOtherHardwareInfo")
} }
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"crypto/rand" "crypto/rand"
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"example.com/m/db"
"example.com/m/log" "example.com/m/log"
"example.com/m/models" "example.com/m/models"
//"example.com/m/nm" //"example.com/m/nm"
...@@ -135,7 +136,7 @@ func IsBase64ImageStr(imageStr string) (bool, []byte, string, string) { ...@@ -135,7 +136,7 @@ func IsBase64ImageStr(imageStr string) (bool, []byte, string, string) {
return true, decodeBytes, formatStr, suffix return true, decodeBytes, formatStr, suffix
} }
func MatchFileCacheQueryString(params map[string][]string, taskImageName string, modelsInfo []*models.ModelInfo, contentType string) string { func MatchFileCacheQueryString(params map[string][]string, taskImageName string, contentType string) string {
values := url.Values{} values := url.Values{}
isExistFileExpires := false isExistFileExpires := false
for key, value := range params { for key, value := range params {
...@@ -145,22 +146,14 @@ func MatchFileCacheQueryString(params map[string][]string, taskImageName string, ...@@ -145,22 +146,14 @@ func MatchFileCacheQueryString(params map[string][]string, taskImageName string,
break break
} }
} }
isModelExistFileExpires := false
if !isExistFileExpires { if !isExistFileExpires {
for _, info := range modelsInfo { modelInfo, _ := db.GetModel(taskImageName)
if info == nil { if modelInfo != nil && modelInfo.FileExpiresTime != "" {
continue values.Add(models.ResultFileExpiresDB, modelInfo.FileExpiresTime)
} } else {
if info.ImageName == taskImageName && info.FileExpiresTime != "" { values.Add(models.ResultFileExpiresDB, "600")
values.Add(models.ResultFileExpiresDB, info.FileExpiresTime)
isModelExistFileExpires = true
break
}
} }
} }
if !isModelExistFileExpires {
values.Add(models.ResultFileExpiresDB, "600")
}
values.Add(models.ContentType, contentType) values.Add(models.ContentType, contentType)
return values.Encode() return values.Encode()
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment