Commit 142a2d16 authored by duanjinfei's avatar duanjinfei

update monitor modelInfo

parent 1bfec526
...@@ -13,10 +13,10 @@ func (d *BaseController) ResponseInfo(code int, msg interface{}, result interfac ...@@ -13,10 +13,10 @@ func (d *BaseController) ResponseInfo(code int, msg interface{}, result interfac
switch code { switch code {
case 500: case 500:
logs.Error(msg, result) logs.Error(msg, result)
d.Data["json"] = result d.Data["json"] = map[string]interface{}{"code": "500", "msg": msg, "data": result}
case 200: case 200:
logs.Info(msg, result) logs.Info(msg, result)
d.Data["json"] = result d.Data["json"] = map[string]interface{}{"code": "200", "msg": msg, "data": result}
} }
d.ServeJSON() d.ServeJSON()
} }
...@@ -73,7 +73,7 @@ func (c *NodeController) AddNodeManager() { ...@@ -73,7 +73,7 @@ func (c *NodeController) AddNodeManager() {
} }
func (c *NodeController) GetNodeManager() { func (c *NodeController) GetNodeManagers() {
manager := nm.GetNodeManagers() manager := nm.GetNodeManagers()
res := make([]*nm.NodeManager, 0) res := make([]*nm.NodeManager, 0)
for _, nodeManager := range manager { for _, nodeManager := range manager {
......
...@@ -3,6 +3,7 @@ package main ...@@ -3,6 +3,7 @@ package main
import ( import (
"example.com/m/log" "example.com/m/log"
"example.com/m/nm" "example.com/m/nm"
_ "example.com/m/routers"
"github.com/astaxie/beego" "github.com/astaxie/beego"
_ "net/http/pprof" _ "net/http/pprof"
) )
...@@ -14,6 +15,6 @@ func main() { ...@@ -14,6 +15,6 @@ func main() {
//}() //}()
//runtime.SetBlockProfileRate(1) // 开启对阻塞操作的跟踪,block //runtime.SetBlockProfileRate(1) // 开启对阻塞操作的跟踪,block
//runtime.SetMutexProfileFraction(1) // 开启对锁调用的跟踪,mutex //runtime.SetMutexProfileFraction(1) // 开启对锁调用的跟踪,mutex
nm.StartMonitor() go nm.StartMonitor()
beego.Run() beego.Run()
} }
...@@ -36,9 +36,9 @@ type ModelInfo struct { ...@@ -36,9 +36,9 @@ type ModelInfo struct {
} }
type ComputeResult struct { type ComputeResult struct {
Code string `json:"code"` Code string `json:"code"`
Msg string `json:"msg"` Msg string `json:"msg"`
Content string `json:"content"` Data string `json:"data"`
} }
type NodeManagerClient struct { type NodeManagerClient struct {
......
...@@ -12,3 +12,9 @@ type NodeManagerReq struct { ...@@ -12,3 +12,9 @@ type NodeManagerReq struct {
PublicKey string `json:"public_key"` PublicKey string `json:"public_key"`
EndPoint string `json:"end_point"` EndPoint string `json:"end_point"`
} }
type Resp struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data string `json:"data"`
}
package nm package nm
import ( import (
"bytes"
"encoding/json"
"example.com/m/log" "example.com/m/log"
"example.com/m/models" "example.com/m/models"
"example.com/m/operate" "example.com/m/operate"
nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1" nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"io"
"net/http"
"strings"
"time" "time"
) )
func monitorModelInfo(dockerOp *operate.DockerOp) { func monitorModelInfo(dockerOp *operate.DockerOp) {
// TODO: 向api端获取model信息 client := &http.Client{}
info := &models.ModelInfo{
TaskId: 1,
User: "",
Pwd: "",
Repository: "",
SignUrl: "http://192.168.1.120:8888/llm/test/get/sign",
ImageName: "onlydd/llm-server:0119",
DiskSize: 10000,
MemorySize: 10000,
IsImageExist: false,
}
ticker := time.NewTicker(time.Second * 1) ticker := time.NewTicker(time.Second * 1)
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
resp := make([]*models.ModelInfo, 0) modelInfosResp := make([]*models.ModelInfo, 0)
resp = append(resp, info) modelResp, err := client.Get("http://192.168.1.8:8087/api/task/taskheat")
if err != nil {
log.Error("Error getting model info from client failed:", err)
continue
}
bodyBytes, err := io.ReadAll(modelResp.Body)
if err != nil {
log.Error("Error reading model response failed:", err)
continue
}
resp := &models.Resp{}
err = json.Unmarshal(bodyBytes, resp)
if err != nil {
log.Error("Unmarshal model response failed:", err)
continue
}
dataResp, err := io.ReadAll(bytes.NewBufferString(resp.Data))
if err != nil {
log.Error("Error reading model response data failed:", err)
continue
}
err = json.Unmarshal(dataResp, &modelInfosResp)
if err != nil {
log.Error("Unmarshal model response failed:", err)
continue
}
modelInfoMap := make(map[uint64]*models.ModelInfo, 0) modelInfoMap := make(map[uint64]*models.ModelInfo, 0)
for _, modelInfo := range resp { for _, modelInfo := range modelInfosResp {
modelInfoMap[modelInfo.TaskId] = modelInfo modelInfoMap[modelInfo.TaskId] = modelInfo
} }
imageNameMap, err := dockerOp.PsImageNameMap() imageNameMap, err := dockerOp.PsImageNameMap()
...@@ -36,8 +55,12 @@ func monitorModelInfo(dockerOp *operate.DockerOp) { ...@@ -36,8 +55,12 @@ func monitorModelInfo(dockerOp *operate.DockerOp) {
log.Error("Docker op ps images failed:", err) log.Error("Docker op ps images failed:", err)
continue continue
} }
for _, modelInfo := range modelInfos { for _, modelInfo := range modelInfosResp {
if modelInfo.IsImageExist { if modelInfo.ImageName == "" {
continue
}
split := strings.Split(modelInfo.ImageName, ":")
if len(split) != 2 {
continue continue
} }
if !imageNameMap[modelInfo.ImageName] { if !imageNameMap[modelInfo.ImageName] {
...@@ -48,16 +71,16 @@ func monitorModelInfo(dockerOp *operate.DockerOp) { ...@@ -48,16 +71,16 @@ func monitorModelInfo(dockerOp *operate.DockerOp) {
go dockerOp.PullImage(modelInfo) go dockerOp.PullImage(modelInfo)
modelInfo.IsImageExist = true modelInfo.IsImageExist = true
dockerOp.ModelTaskIdChan <- modelInfo.TaskId dockerOp.ModelTaskIdChan <- modelInfo.TaskId
dockerOp.IsReportModelTaskId[modelInfo.TaskId] = true
} }
} else if !dockerOp.IsReportModelTaskId[modelInfo.TaskId] { } else if !dockerOp.IsReportModelTaskId[modelInfo.TaskId] {
dockerOp.ModelTaskIdChan <- modelInfo.TaskId dockerOp.ModelTaskIdChan <- modelInfo.TaskId
dockerOp.IsReportModelTaskId[modelInfo.TaskId] = true dockerOp.IsReportModelTaskId[modelInfo.TaskId] = true
} }
if modelInfoMap[modelInfo.TaskId] == nil { dockerOp.SignApi[modelInfo.ImageName] = modelInfo.SignUrl
dockerOp.SignApi[modelInfo.ImageName] = modelInfo.SignUrl dockerOp.ModelsInfo = append(dockerOp.ModelsInfo, modelInfo)
dockerOp.ModelsInfo = append(dockerOp.ModelsInfo, modelInfo)
}
} }
ticker = time.NewTicker(time.Minute * 65)
} }
} }
} }
......
...@@ -22,13 +22,11 @@ var ( ...@@ -22,13 +22,11 @@ var (
usedNodeManagerClient []*models.NodeManagerClient usedNodeManagerClient []*models.NodeManagerClient
nodeManagerClientChan chan *models.NodeManagerClient nodeManagerClientChan chan *models.NodeManagerClient
nodeManagerMsgChan chan *nodeManagerV1.ManagerMessage nodeManagerMsgChan chan *nodeManagerV1.ManagerMessage
modelInfos []*models.ModelInfo
) )
func init() { func init() {
nodeManagerArr = make([]*NodeManager, 0) nodeManagerArr = make([]*NodeManager, 0)
usedNodeManagerClient = make([]*models.NodeManagerClient, 0) usedNodeManagerClient = make([]*models.NodeManagerClient, 0)
modelInfos = make([]*models.ModelInfo, 0)
nodeManagerClientChan = make(chan *models.NodeManagerClient, 0) nodeManagerClientChan = make(chan *models.NodeManagerClient, 0)
nodeManagerMsgChan = make(chan *nodeManagerV1.ManagerMessage, 1000) nodeManagerMsgChan = make(chan *nodeManagerV1.ManagerMessage, 1000)
} }
......
...@@ -95,7 +95,7 @@ func (d *DockerOp) GetContainerSign(taskMsg *nodemanagerv1.PushTaskMessage, task ...@@ -95,7 +95,7 @@ func (d *DockerOp) GetContainerSign(taskMsg *nodemanagerv1.PushTaskMessage, task
log.Error("Failed to parse docker response body") log.Error("Failed to parse docker response body")
return nil return nil
} }
sign := res.Content sign := res.Data
log.Info("Container sign:", sign) log.Info("Container sign:", sign)
return common.Hex2Bytes(sign) return common.Hex2Bytes(sign)
} }
......
...@@ -6,5 +6,5 @@ import ( ...@@ -6,5 +6,5 @@ import (
) )
func init() { func init() {
beego.Router("/llm/test/get/sign", &controllers.NodeController{}, "post:GetContainerSign") beego.Router("/power/node/get/nm", &controllers.NodeController{}, "post:GetNodeManagers")
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment