Commit 26212fdb authored by duanjinfei's avatar duanjinfei

update msg resp

parent ef936217
......@@ -132,7 +132,7 @@ func (c *NodeController) UpdateRecvStatus() {
c.ResponseInfo(500, "The task current is recv status , don't need setting", "")
return
}
if req.IsRecv && !nm.IsRunning {
if req.IsRecv && len(conf.GetConfig().BenefitAddress) >= 1 {
go nm.StartMonitor()
}
if !nm.IsRecvTask && req.IsRecv {
......@@ -206,6 +206,10 @@ func (c *NodeController) DelBenefitAddress() {
c.ResponseInfo(500, "Don't del current benefit address", "")
return
}
if len(nm.HistoryBenefitAcc) == 1 {
c.ResponseInfo(500, "Don't del current benefit address", "")
return
}
isExist := false
for _, s := range nm.HistoryBenefitAcc {
if strings.ToLower(s.Address) == strings.ToLower(req.Address) {
......
......@@ -42,6 +42,8 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-memdb v1.3.4 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/holiman/uint256 v1.2.4 // indirect
......
......@@ -218,6 +218,12 @@ github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/hashicorp/go-immutable-radix v1.3.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-immutable-radix v1.3.1 h1:DKHmCUm2hRBK510BaiZlwvpD40f8bJFeZnpfm2KLowc=
github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-memdb v1.3.4 h1:XSL3NR682X/cVk2IeV0d70N4DZ9ljI885xAEU8IoK3c=
github.com/hashicorp/go-memdb v1.3.4/go.mod h1:uBTr1oQbtuMgd1SSGoR8YV27eT3sBHbYiNm53bMpgSg=
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
......
......@@ -19,7 +19,7 @@ var (
)
func init() {
IsRecvTask = true
IsRecvTask = false
HistoryBenefitAcc = make([]*models.BenefitAddressStruct, 0)
RunningState = &models.RunningState{
RunningTime: time.Now().Unix(),
......
......@@ -65,6 +65,10 @@ func (m *MonitorNm) monitorNmClient() {
nodeManagerHandler := NewNodeManagerHandler(nodeManager, worker, msgRespWorker, taskMsgWorker)
log.Info("Report model info started")
if nodeManager.IsSelected {
go m.monitorGpuUsage(msgRespWorker, nodeManager, worker)
}
go nodeManagerHandler.MonitorStandardTaskWorker()
log.Info("Monitor standard task worker started")
......@@ -135,3 +139,17 @@ func (m *MonitorNm) monitorNodeManagerSeed() {
}
}
}
func (m *MonitorNm) monitorGpuUsage(msgRespWorker *RespMsgWorker, nodeManager *models.NodeManagerClient, worker nodemanagerV2.NodeManagerService_RegisterWorkerClient) {
tick := time.NewTicker(time.Millisecond)
defer tick.Stop()
for {
select {
case <-tick.C:
{
msgRespWorker.RegisterMsgResp(nodeManager, worker, GpuUsageResp, nil)
tick = time.NewTicker(time.Minute * 10)
}
}
}
}
......@@ -14,10 +14,10 @@ import (
"time"
)
var modelRunningBeoforeMem map[string]int64
var ModelRunningBeforeMem map[string]int64
func init() {
modelRunningBeoforeMem = make(map[string]int64, 0)
ModelRunningBeforeMem = make(map[string]int64, 0)
}
type NodeManagerHandler struct {
......@@ -108,6 +108,7 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node
RunningState.CompletedTaskCount++
log.Info("Completed task count: ", RunningState.CompletedTaskCount)
log.Info("--------------taskMsg--------------:", taskMsg)
msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, GpuUsageResp, ackParams)
}(n.msgRespWorker, n.taskMsgWorker, taskMsg)
continue
}
......@@ -170,49 +171,17 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node
}
case nodemanagerV2.ModelOperateType_RUN:
{
envMap := make(map[string]string, 0)
dockerCmd := &models.DockerCmd{
EnvMap: envMap,
HostIp: models.ZeroHost,
HostPort: n.taskMsgWorker.getExternalPort(),
}
info := getHardwareInfo()
if info == nil {
containerId, gpuSeq, err := dockerOp.CreateAndStartContainer(model, dockerCmd)
if err != nil {
log.WithError(err).Error("Error creating container")
continue
}
gpu := info.GPU
isMatch := false
for _, gpuInfo := range gpu {
if gpuInfo.MemFree > model.RunningMem {
envMap[models.CudaEnv] = strconv.FormatInt(int64(gpuInfo.Seq), 10)
isMatch = true
break
}
}
if !isMatch {
runningModel := db.GetRunningModel()
if len(runningModel) == 0 {
continue
}
for _, modelInfo := range runningModel {
if modelInfo.RunningMem > model.RunningMem {
isMatch = true
dockerOp.StopContainer(model.ContainerId)
envMap[models.CudaEnv] = strconv.FormatInt(int64(modelInfo.GpuSeq), 10)
break
}
}
}
if isMatch {
modelRunningBeoforeMem[model.ImageName] = dockerCmd.RunningBeforeMem
gpuSeq, _ := strconv.ParseInt(dockerCmd.EnvMap[models.CudaEnv], 10, 32)
model.GpuSeq = int32(gpuSeq)
_, err := dockerOp.CreateAndStartContainer(model.ImageName, dockerCmd)
if err != nil {
log.WithError(err).Error("Error creating container")
continue
}
}
model.ContainerId = containerId
model.GpuSeq = gpuSeq
}
case nodemanagerV2.ModelOperateType_STOP:
{
......@@ -322,17 +291,17 @@ func (n *NodeManagerHandler) MonitorImageOp(op *nodemanagerV2.ModelOperate) {
if time.Since(now).Minutes() > models.TwoMinutes || isOp {
return
}
info := getHardwareInfo()
info := GetHardwareInfo()
if info == nil {
continue
}
memIsChange := false
for _, gpuInfo := range info.GPU {
if gpuInfo.Seq == model.GpuSeq {
if modelRunningBeoforeMem[op.ImageName] <= gpuInfo.MemFree {
if ModelRunningBeforeMem[op.ImageName] <= gpuInfo.MemFree {
break
}
model.RunningMem = modelRunningBeoforeMem[op.ImageName] - gpuInfo.MemFree
model.RunningMem = ModelRunningBeforeMem[op.ImageName] - gpuInfo.MemFree
memIsChange = true
}
}
......
......@@ -94,7 +94,7 @@ func RegisterInfoResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
if err != nil {
return nil
}
hardwareInfo := getHardwareInfo()
hardwareInfo := GetHardwareInfo()
nodeInfoRes := &nodemanagerV2.WorkerMessage{
Message: &nodemanagerV2.WorkerMessage_RegisteMessage{
RegisteMessage: &nodemanagerV2.RegisteMessage{
......@@ -115,7 +115,7 @@ func RegisterInfoResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
func NodeInfoResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("Node info response received params:", params)
hardwareInfo := getHardwareInfo()
hardwareInfo := GetHardwareInfo()
modelsInfo := params[0].(*largeModel.ModelHandler)
readModels, err := modelsInfo.GetRpcModelsResp()
if err != nil {
......@@ -140,7 +140,7 @@ func NodeInfoResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
func DeviceInfoResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("Device info response received params:", params)
hardwareInfo := getHardwareInfo()
hardwareInfo := GetHardwareInfo()
deviceInfoRes := &nodemanagerV2.WorkerMessage{
Message: &nodemanagerV2.WorkerMessage_DeviceInfo{
DeviceInfo: &nodemanagerV2.DeviceInfoMessage{
......@@ -155,7 +155,7 @@ func DeviceInfoResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
func DeviceUsageResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("DeviceUsageResp params :", params)
hardwareInfo := getHardwareInfo()
hardwareInfo := GetHardwareInfo()
ramUsage := int32((1 - float64(hardwareInfo.RAM.Total)/float64(hardwareInfo.RAM.Free)) * 100)
diskUsage := int32((1 - float64(hardwareInfo.DISK.Total)/float64(hardwareInfo.DISK.Free)) * 100)
deviceInfoRes := &nodemanagerV2.WorkerMessage{
......@@ -176,7 +176,7 @@ func DeviceUsageResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
func GpuUsageResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("DeviceUsageResp params :", params)
hardwareInfo := getHardwareInfo()
hardwareInfo := GetHardwareInfo()
gpusUsage := make([]*nodemanagerV2.GPUUsage, 0)
for _, gpuInfo := range hardwareInfo.GPU {
usage := &nodemanagerV2.GPUUsage{
......@@ -355,7 +355,7 @@ func DelModelRunningResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
return delModelRunningRes
}
func getHardwareInfo() *nodemanagerV2.HardwareInfo {
func GetHardwareInfo() *nodemanagerV2.HardwareInfo {
hardwareInfo := utils.GetHardwareInfo(conf.GetConfig().HardwareUrl)
if hardwareInfo == nil {
return nil
......
......@@ -14,7 +14,6 @@ import (
var (
nodeManagerArr []*NodeManager
usedNodeManagerClient []*models.NodeManagerClient
IsRunning bool
)
func init() {
......@@ -23,7 +22,7 @@ func init() {
}
func StartMonitor() {
IsRunning = true
IsRecvTask = true
dockerOp := operate.NewDockerOp()
if !dockerOp.IsHealthy {
log.Error("Docker operation is not healthy reason:", dockerOp.Reason)
......
......@@ -182,13 +182,14 @@ func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodemanagerV2.PushTaskMessage)
if !running {
taskOp.taskCmd.DockerCmd.HostIp = models.ZeroHost
taskOp.taskCmd.DockerCmd.HostPort = t.getExternalPort()
containerId, err := t.DockerOp.CreateAndStartContainer(taskOp.taskCmd.ImageName, taskOp.taskCmd.DockerCmd)
containerId, gpuSeq, err := t.DockerOp.CreateAndStartContainer(model, taskOp.taskCmd.DockerCmd)
if err != nil {
log.Errorf("Create and start container failed: %s", err.Error())
taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Create and start container failed", err.Error())
t.ExecTaskIdIsFinished.Store(taskMsg.TaskId, true)
return
}
model.GpuSeq = gpuSeq
log.Infof("Started container with ID %s", containerId)
}
if err = taskOp.waitContainerRunning(t, taskOp.taskCmd.ImageName, uint16(taskOp.taskCmd.DockerCmd.ContainerPort)); err != nil {
......
......@@ -5,8 +5,10 @@ import (
"context"
"encoding/json"
"example.com/m/conf"
"example.com/m/db"
"example.com/m/log"
"example.com/m/models"
"example.com/m/nm"
"fmt"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
......@@ -123,18 +125,19 @@ func (d *DockerOp) ListContainer() []types.Container {
return containers
}
func (d *DockerOp) CreateAndStartContainer(imageName string, dockerCmd *models.DockerCmd) (string, error) {
containerId, err := d.CreateContainer(imageName, dockerCmd)
func (d *DockerOp) CreateAndStartContainer(modelInfo *models.ModelInfo, dockerCmd *models.DockerCmd) (string, int32, error) {
gpuSeq := d.checkGpuUsage(modelInfo, dockerCmd)
containerId, err := d.CreateContainer(modelInfo.ImageName, dockerCmd)
if err != nil {
log.Error("Error creating container image failed: ", err)
return "", err
return "", gpuSeq, err
}
// 启动容器
startContainerIsSuccess := d.StartContainer(containerId)
if !startContainerIsSuccess {
log.Error("start container failed:", startContainerIsSuccess)
return "", fmt.Errorf("start container failed")
return "", gpuSeq, fmt.Errorf("start container failed")
}
//ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
......@@ -160,7 +163,7 @@ func (d *DockerOp) CreateAndStartContainer(imageName string, dockerCmd *models.D
// return "", err
//}
return containerId, nil
return containerId, gpuSeq, nil
}
func (d *DockerOp) CreateContainer(imageName string, dockerCmd *models.DockerCmd) (string, error) {
......@@ -217,6 +220,17 @@ func (d *DockerOp) CreateContainer(imageName string, dockerCmd *models.DockerCmd
func (d *DockerOp) StartContainer(containerID string) bool {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*20)
defer cancel()
info, err := d.getContainerInfo(containerID)
if err == nil {
for _, port := range info.Ports {
d.UsedExternalPort[int64(port.PublicPort)] = true
}
}
mounts := info.Mounts
for _, mount := range mounts {
if mount.Destination == "/path/to/gpu/memory" {
}
}
// 启动容器
if err := d.dockerClient.ContainerStart(ctx, containerID, types.ContainerStartOptions{}); err != nil {
log.Error("Start container failed:", err)
......@@ -230,6 +244,12 @@ func (d *DockerOp) StartContainer(containerID string) bool {
func (d *DockerOp) StopContainer(containerID string) bool {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*20)
defer cancel()
info, err := d.getContainerInfo(containerID)
if err == nil {
for _, port := range info.Ports {
d.UsedExternalPort[int64(port.PublicPort)] = false
}
}
// 停止容器(如果正在运行)
if err := d.dockerClient.ContainerStop(ctx, containerID, container.StopOptions{}); err != nil {
// 可能容器已经停止或不存在
......@@ -360,3 +380,50 @@ func (d *DockerOp) GetDockerInfo() (int64, int64, int64, int64, error) {
}
return 0, 0, 0, 0, fmt.Errorf("get disk size failed")
}
func (d *DockerOp) getContainerInfo(id string) (types.Container, error) {
listContainer := d.ListContainer()
for _, containerInfo := range listContainer {
if containerInfo.ID == id {
return containerInfo, nil
}
}
return types.Container{}, fmt.Errorf("get container info failed")
}
func (d *DockerOp) checkGpuUsage(modelInfo *models.ModelInfo, dockerCmd *models.DockerCmd) int32 {
info := nm.GetHardwareInfo()
if info == nil {
return 0
}
envMap := make(map[string]string, 0)
gpu := info.GPU
isMatch := false
for _, gpuInfo := range gpu {
if gpuInfo.MemFree > modelInfo.RunningMem {
envMap[models.CudaEnv] = strconv.FormatInt(int64(gpuInfo.Seq), 10)
isMatch = true
break
}
}
if !isMatch {
runningModel := db.GetRunningModel()
if len(runningModel) == 0 {
return 0
}
for _, modelInfo := range runningModel {
if modelInfo.RunningMem > modelInfo.RunningMem {
isMatch = true
d.StopContainer(modelInfo.ContainerId)
envMap[models.CudaEnv] = strconv.FormatInt(int64(modelInfo.GpuSeq), 10)
break
}
}
}
if isMatch {
nm.ModelRunningBeforeMem[modelInfo.ImageName] = dockerCmd.RunningBeforeMem
gpuSeq, _ := strconv.ParseInt(dockerCmd.EnvMap[models.CudaEnv], 10, 32)
return int32(gpuSeq)
}
return 0
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment