Commit 6f6a360d authored by duanjinfei's avatar duanjinfei

add controller

parent 0420afd8
......@@ -3,6 +3,7 @@ package main
import (
"example.com/m/conf"
"example.com/m/log"
"example.com/m/models"
"example.com/m/nm"
"example.com/m/utils"
"fmt"
......@@ -53,12 +54,12 @@ var RootCmd = &cobra.Command{
}
isExist := false
for _, acc := range nm.HistoryBenefitAcc {
if acc == rewardAddr {
if acc.Address == rewardAddr {
isExist = true
}
}
if !isExist {
nm.HistoryBenefitAcc = append(nm.HistoryBenefitAcc, rewardAddr)
nm.HistoryBenefitAcc = append(nm.HistoryBenefitAcc, &models.BenefitAddressStruct{Address: rewardAddr, IsDel: false})
err := utils.WriteBenefitFile(nm.HistoryBenefitAcc)
if err != nil {
log.Error("WriteBenefitFile failed with error:", err)
......
......@@ -42,7 +42,7 @@ func (c *NodeController) SetBenefitAddress() {
c.ResponseInfo(500, "param error", "")
return
}
req := &models.BenefitAddress{}
req := &models.BenefitAddressReq{}
err = json.Unmarshal(bodyReq, req)
if err != nil {
c.ResponseInfo(500, "param error", "")
......@@ -54,12 +54,12 @@ func (c *NodeController) SetBenefitAddress() {
}
isExist := false
for _, s := range nm.HistoryBenefitAcc {
if s == req.Address {
if s.Address == req.Address {
isExist = true
}
}
if !isExist {
nm.HistoryBenefitAcc = append(nm.HistoryBenefitAcc, req.Address)
nm.HistoryBenefitAcc = append(nm.HistoryBenefitAcc, &models.BenefitAddressStruct{Address: req.Address, IsDel: false})
err = utils.WriteBenefitFile(nm.HistoryBenefitAcc)
if err != nil {
c.ResponseInfo(500, "Write benefit file failed", "")
......@@ -70,7 +70,14 @@ func (c *NodeController) SetBenefitAddress() {
func (c *NodeController) ListHistoryBenefitAddress() {
fileBenefitAcc, _ := utils.ReadBenefitFile()
c.ResponseInfo(200, "list history benefit address successful", fileBenefitAcc)
res := make([]*models.BenefitAddressStruct, 0)
for _, addressStruct := range fileBenefitAcc {
if addressStruct.IsDel {
continue
}
res = append(res, addressStruct)
}
c.ResponseInfo(200, "list history benefit address successful", res)
}
func (c *NodeController) AddNodeManager() {
......@@ -139,3 +146,62 @@ func (c *NodeController) GetConfigInfo() {
func (c *NodeController) GetBenefit() {
c.ResponseInfo(200, "get benefit address successful", conf.GetConfig().BenefitAddress)
}
func (c *NodeController) SwitchMode() {
bodyReq, err := io.ReadAll(c.Ctx.Request.Body)
if err != nil || bodyReq == nil {
c.ResponseInfo(500, "param error", "")
return
}
req := &models.RunMode{}
err = json.Unmarshal(bodyReq, req)
if err != nil {
c.ResponseInfo(500, "param error", "")
return
}
switch req.Type {
case models.BasicMode:
{
}
case models.HealthMode:
{
}
case models.SaveMode:
{
}
}
c.ResponseInfo(200, "switch mode successful", "")
}
func (c *NodeController) DelBenefitAddress() {
bodyReq, err := io.ReadAll(c.Ctx.Request.Body)
if err != nil || bodyReq == nil {
c.ResponseInfo(500, "param error", "")
return
}
req := &models.BenefitAddressReq{}
err = json.Unmarshal(bodyReq, req)
if err != nil {
c.ResponseInfo(500, "param error", "")
return
}
if req.Address == conf.GetConfig().BenefitAddress {
c.ResponseInfo(500, "Don't del current benefit address", "")
return
}
isExist := false
for _, s := range nm.HistoryBenefitAcc {
if s.Address == req.Address {
s.IsDel = true
isExist = true
}
}
if !isExist {
c.ResponseInfo(500, "The account not exist,don't del", "")
return
}
c.ResponseInfo(200, "The account del successful ", "")
}
......@@ -21,11 +21,11 @@ func (c *StateController) GetRunningState() {
func (c *StateController) GetRunningTp() {
info := utils.GetHardwareInfo()
if len(info.Data.Gpus) > 0 {
var totalTemp int64
var totalUsage int64
for _, gpu := range info.Data.Gpus {
totalTemp += gpu.Temp
totalUsage += gpu.Usage
}
avgTemp := totalTemp / int64(len(info.Data.Gpus))
avgTemp := totalUsage / int64(len(info.Data.Gpus))
c.ResponseInfo(200, "get running state successful", avgTemp)
}
c.ResponseInfo(500, "get running tp failed", 0)
......
......@@ -6,6 +6,7 @@ import (
"example.com/m/models"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/iterator"
"sort"
)
var dbInstance *leveldb.DB
......@@ -101,6 +102,20 @@ func GetModel(key string) (*models.ModelInfo, error) {
return imageInfo, nil
}
func GetRunningModel() []*models.ModelInfo {
res := make([]*models.ModelInfo, 0)
for key := range modelKeys {
model, _ := GetModel(key)
if model != nil && model.IsRunning {
res = append(res, model)
}
}
sort.Slice(res, func(i, j int) bool {
return res[i].TotalRunCount < res[j].TotalRunCount
})
return res
}
func Delete(key []byte) error {
err := dbInstance.Delete(key, nil)
if err != nil {
......
......@@ -73,11 +73,17 @@ func (m *ModelHandler) MonitorModelInfo() {
model, _ := db.GetModel(modelInfo.ImageName)
if model != nil {
model.UpdateFiled(modelInfo)
}
err := db.PutModel(modelInfo.ImageName, modelInfo)
if err != nil {
log.WithError(err).Error("Put db error")
continue
err := db.PutModel(modelInfo.ImageName, model)
if err != nil {
log.WithError(err).Error("Put db error")
continue
}
} else {
err := db.PutModel(modelInfo.ImageName, modelInfo)
if err != nil {
log.WithError(err).Error("Put db error")
continue
}
}
}
ticker = time.NewTicker(time.Minute * 10)
......
......@@ -24,4 +24,7 @@ const (
EncryptionKey = "uxhendjFYsoWFnsO"
HistoryBenefitAddressDirectory = "data/benefitList"
CudaEnv = "CUDA_VISIBLE_DEVICES"
BasicMode = 1
HealthMode = 2
SaveMode = 3
)
......@@ -6,6 +6,11 @@ import (
"time"
)
type BenefitAddressStruct struct {
Address string `json:"address"`
IsDel bool `json:"is_del"`
}
type TaskCmd struct {
ImageName string `json:"image_name"`
DockerCmd *DockerCmd `json:"docker_cmd"`
......
package models
type BenefitAddress struct {
type BenefitAddressReq struct {
Address string `json:"address"`
}
type RunMode struct {
Type int `json:"type"`
}
type RecvTask struct {
IsRecv bool `json:"is_recv"`
}
......
......@@ -13,20 +13,20 @@ type NodeManager struct {
}
var (
HistoryBenefitAcc []string
HistoryBenefitAcc []*models.BenefitAddressStruct
RunningState *models.RunningState
IsRecvTask bool
)
func init() {
IsRecvTask = true
HistoryBenefitAcc = make([]string, 0)
HistoryBenefitAcc = make([]*models.BenefitAddressStruct, 0)
RunningState = &models.RunningState{
RunningTime: time.Now().Unix(),
CompletedTaskCount: 0,
NmIpAddr: "",
NmDelayTime: 0,
NmLocation: "",
NmLocation: "Hong Kong",
}
}
......
......@@ -181,7 +181,18 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node
}
}
if !isMatch {
runningModel := db.GetRunningModel()
if len(runningModel) == 0 {
continue
}
for _, modelInfo := range runningModel {
if modelInfo.RunningMem > model.GpuRam {
isMatch = true
dockerOp.StopContainer(model.ContainerId)
envMap[models.CudaEnv] = strconv.FormatInt(int64(modelInfo.GpuSeq), 10)
break
}
}
}
if isMatch {
_, err := dockerOp.CreateAndStartContainer(model.ImageName, dockerCmd)
......
......@@ -10,10 +10,12 @@ func init() {
beego.Router("/api/v1/power/set/nm/seed", &controllers.NodeController{}, "post:SetNmSeed")
beego.Router("/api/v1/power/join/benefit/acc", &controllers.NodeController{}, "post:SetBenefitAddress")
beego.Router("/api/v1/power/list/benefit/acc", &controllers.NodeController{}, "get:ListHistoryBenefitAddress")
beego.Router("/api/v1/power/del/benefit/acc", &controllers.NodeController{}, "post:DelBenefitAddress")
beego.Router("/api/v1/power/update/recv/status", &controllers.NodeController{}, "post:UpdateRecvStatus")
beego.Router("/api/v1/power/get/recv/status", &controllers.NodeController{}, "get:GetRecvStatus")
beego.Router("/api/v1/power/get/conf", &controllers.NodeController{}, "get:GetConfigInfo")
beego.Router("/api/v1/power/get/current/benefit", &controllers.NodeController{}, "get:GetBenefit")
beego.Router("/api/v1/power/switch/mode", &controllers.NodeController{}, "post:SwitchMode")
beego.Router("/api/v1/power/get/running/tp", &controllers.StateController{}, "get:GetRunningTp")
beego.Router("/api/v1/power/get/running/state", &controllers.StateController{}, "get:GetRunningState")
beego.Router("/api/v1/power/get/worker/info", &controllers.StateController{}, "get:GetWorkerInfo")
......
......@@ -215,7 +215,7 @@ func DeleteNm(nodeArr []*models.NodeManagerClient, target int) []*models.NodeMan
return nodeArr[:len(nodeArr)-1]
}
func WriteBenefitFile(historyBenefitAcc []string) error {
func WriteBenefitFile(historyBenefitAcc []*models.BenefitAddressStruct) error {
log.Info("WritingBenefitFile........")
key := []byte(models.EncryptionKey)
serializedData, err := json.Marshal(historyBenefitAcc)
......@@ -230,14 +230,14 @@ func WriteBenefitFile(historyBenefitAcc []string) error {
return nil
}
func ReadBenefitFile() ([]string, error) {
func ReadBenefitFile() ([]*models.BenefitAddressStruct, error) {
log.Info("ReadBenefitFile........")
key := []byte(models.EncryptionKey)
readRes, err := readAndDecryptFile(key, models.HistoryBenefitAddressDirectory)
if err != nil {
return nil, err
}
res := make([]string, 0)
res := make([]*models.BenefitAddressStruct, 0)
err = json.Unmarshal(readRes, &res)
if err != nil {
return nil, err
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment