Commit ad9d0884 authored by duanjinfei's avatar duanjinfei

provide test

parent c23880c9
......@@ -21,8 +21,8 @@ var (
)
func init() {
RootCmd.PersistentFlags().StringVarP(&rewardAddr, "reward", "r", "", "please enter a reward address")
RootCmd.PersistentFlags().StringVarP(&externalIp, "externalIp", "e", "", "please enter server external ip address")
RootCmd.PersistentFlags().StringVarP(&rewardAddr, "reward", "r", "0x40EC4256fcBCA69CdbAc942594caeC79FBE10494", "please enter a reward address")
RootCmd.PersistentFlags().StringVarP(&externalIp, "externalIp", "e", "192.168.1.102", "please enter server external ip address")
RootCmd.PersistentFlags().StringVarP(&opSys, "opSys", "s", "", "please enter you op sys name : win、linux")
RootCmd.PersistentFlags().BoolVarP(&debug, "debug", "d", false, "set log level debug")
cobra.OnInitialize(initConfig)
......
......@@ -13,5 +13,5 @@
"is_stop_last_container": true,
"disk_usage":80,
"init_run_mode": 1,
"hardware_url": "http://127.0.0.1:7000/hw"
"hardware_url": "http://47.94.59.74:8005/hw"
}
\ No newline at end of file
......@@ -25,7 +25,7 @@ func (c *StateController) GetRunningState() {
}
func (c *StateController) GetRunningTp() {
info := utils.GetHardwareInfo(conf.GetConfig().HardwareUrl)
info := utils.GetApiHardwareInfo(conf.GetConfig().HardwareUrl)
if info == nil {
c.ResponseInfo(500, "get running tp failed", 0)
return
......@@ -37,7 +37,7 @@ func (c *StateController) GetRunningTp() {
}
func (c *StateController) GetRunningLineChart() {
info := utils.GetHardwareInfo(conf.GetConfig().HardwareUrl)
info := utils.GetApiHardwareInfo(conf.GetConfig().HardwareUrl)
if info == nil {
c.ResponseInfo(500, "get running tp failed", "")
return
......@@ -63,7 +63,7 @@ func (c *StateController) GetWorkerInfo() {
}
func (c *StateController) GetListGpuInfo() {
info := utils.GetHardwareInfo(conf.GetConfig().HardwareUrl)
info := utils.GetApiHardwareInfo(conf.GetConfig().HardwareUrl)
if info != nil && info.Data != nil {
c.ResponseInfo(200, "get list gpu info successful", info.Data.Gpus)
return
......@@ -83,7 +83,7 @@ func (c *StateController) GetGpuUsageInfo() {
c.ResponseInfo(500, "param error", "")
return
}
info := utils.GetHardwareInfo(conf.GetConfig().HardwareUrl)
info := utils.GetApiHardwareInfo(conf.GetConfig().HardwareUrl)
if info != nil {
for _, gpu := range info.Data.Gpus {
if gpu.Seq == req.Seq {
......@@ -96,7 +96,7 @@ func (c *StateController) GetGpuUsageInfo() {
}
func (c *StateController) GetOtherHardwareInfo() {
info := utils.GetHardwareInfo(conf.GetConfig().HardwareUrl)
info := utils.GetApiHardwareInfo(conf.GetConfig().HardwareUrl)
var diskTotal, diskFree int64
for _, disk := range info.Data.Disk {
for _, point := range disk.MountPoints {
......
......@@ -60,11 +60,11 @@ func (m *ModelHandler) MonitorModelInfo() {
log.Warn("Response data is empty")
continue
}
imageMap, err := m.dockerOp.PsImageNameMap()
if err != nil {
log.Error("Error getting image name map from client failed:", err)
continue
}
//imageMap, err := m.dockerOp.PsImageNameMap()
//if err != nil {
// log.Error("Error getting image name map from client failed:", err)
// continue
//}
modelInfosResp := resp.Data
for _, modelInfo := range modelInfosResp {
if modelInfo.ImageName == "" {
......@@ -94,10 +94,10 @@ func (m *ModelHandler) MonitorModelInfo() {
}
log.WithField("name", modelInfo.ImageName).Info("The image add")
}
if !imageMap[modelInfo.ImageName] && modelInfo.PublishStatus == models.ModelPublishStatusYes {
log.WithField("model image name", modelInfo.ImageName).Info("pulling image")
go m.dockerOp.PullImage(model.ImageName)
}
//if !imageMap[modelInfo.ImageName] && modelInfo.PublishStatus == models.ModelPublishStatusYes {
// log.WithField("model image name", modelInfo.ImageName).Info("pulling image")
// go m.dockerOp.PullImage(model.ImageName)
//}
}
m.IsInit = true
ticker = time.NewTicker(time.Minute * 10)
......@@ -201,42 +201,42 @@ func (m *ModelHandler) MonitorModelStatus() {
func (m *ModelHandler) ScanModelsResp() (*nodemanagerV2.ModelsInfo, error) {
installedModels := make([]*nodemanagerV2.InstalledModel, 0)
runningModels := make([]*nodemanagerV2.RunningModel, 0)
images, err := m.dockerOp.PsImageNameMap()
if err != nil {
log.WithError(err).Error("get images failed")
return nil, err
}
containerList := m.dockerOp.ListContainer()
if containerList == nil || len(containerList) == 0 {
log.Error("Get container failed")
return nil, fmt.Errorf("get containe failed")
}
allModels, err := db.GetAllModels()
if err != nil {
log.WithError(err).Error("Get all models failed")
return nil, fmt.Errorf("get all models failed")
}
for _, model := range allModels {
isExist := images[model.ImageName]
if !isExist {
continue
}
diskSize, err := strconv.ParseInt(model.HardwareRequire.DiskSize, 10, 64)
if err != nil {
continue
}
installedModels = append(installedModels, &nodemanagerV2.InstalledModel{ModelId: strconv.FormatUint(model.TaskId, 10), DiskSize: diskSize, InstalledTime: model.SetupTime, LastRunTime: model.LastRunTime})
containerIsExist := false
for _, container := range containerList {
if model.ImageName == container.Image {
containerIsExist = true
}
}
if containerIsExist {
runningModels = append(runningModels, &nodemanagerV2.RunningModel{ModelId: strconv.FormatUint(model.TaskId, 10), GpuSeq: model.GpuSeq, GpuRam: model.RunningMem, StartedTime: model.LastRunTime, LastWorkTime: model.LastWorkTime, TotalRunCount: model.TotalRunCount, ExecTime: model.EstimatExeTime})
}
}
//images, err := m.dockerOp.PsImageNameMap()
//if err != nil {
// log.WithError(err).Error("get images failed")
// return nil, err
//}
//containerList := m.dockerOp.ListContainer()
//if containerList == nil || len(containerList) == 0 {
// log.Error("Get container failed")
// return nil, fmt.Errorf("get containe failed")
//}
//allModels, err := db.GetAllModels()
//if err != nil {
// log.WithError(err).Error("Get all models failed")
// return nil, fmt.Errorf("get all models failed")
//}
//for _, model := range allModels {
// isExist := images[model.ImageName]
// if !isExist {
// continue
// }
// diskSize, err := strconv.ParseInt(model.HardwareRequire.DiskSize, 10, 64)
// if err != nil {
// continue
// }
// installedModels = append(installedModels, &nodemanagerV2.InstalledModel{ModelId: strconv.FormatUint(model.TaskId, 10), DiskSize: diskSize, InstalledTime: model.SetupTime, LastRunTime: model.LastRunTime})
//
// containerIsExist := false
// for _, container := range containerList {
// if model.ImageName == container.Image {
// containerIsExist = true
// }
// }
// if containerIsExist {
// runningModels = append(runningModels, &nodemanagerV2.RunningModel{ModelId: strconv.FormatUint(model.TaskId, 10), GpuSeq: model.GpuSeq, GpuRam: model.RunningMem, StartedTime: model.LastRunTime, LastWorkTime: model.LastWorkTime, TotalRunCount: model.TotalRunCount, ExecTime: model.EstimatExeTime})
// }
//}
res := &nodemanagerV2.ModelsInfo{
InstalledModels: installedModels,
RunningModels: runningModels,
......
This diff is collapsed.
package nm
import (
"bytes"
"example.com/m/conf"
"example.com/m/largeModel"
"example.com/m/log"
......@@ -11,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
nodemanagerV2 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
"math/big"
"strconv"
"time"
)
......@@ -80,28 +80,30 @@ func HeartbeatResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
func RegisterInfoResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("Register info response received params:", params)
modelsInfo := params[0].(*largeModel.ModelHandler)
info := &nodemanagerV2.NodeInfo{
MinerPubkey: conf.GetConfig().SignPub,
BenefitAddress: conf.GetConfig().BenefitAddress,
}
hardwareInfo := GetHardwareInfo()
readModels, err := modelsInfo.ScanModelsResp()
if err != nil {
log.Error("Scan models response error", err)
return nil
}
nowTimeStamp := time.Now().Unix()
nowTimeBytes := big.NewInt(nowTimeStamp).Bytes()
signHash := crypto.Keccak256Hash(bytes.NewBufferString(conf.GetConfig().GetExternalIp()).Bytes(),
bytes.NewBufferString(conf.GetConfig().SignPub).Bytes(),
bytes.NewBufferString(conf.GetConfig().BenefitAddress).Bytes(),
signHash := crypto.Keccak256Hash([]byte(info.String()),
[]byte(hardwareInfo.String()),
[]byte(readModels.String()),
nowTimeBytes)
log.WithField("hash", signHash.String()).Info("register message sign result")
sign, _ := crypto.Sign(signHash.Bytes(), conf.GetConfig().SignPrivateKey)
log.Info("register message sign:", common.Bytes2Hex(sign))
modelsInfo := params[0].(*largeModel.ModelHandler)
readModels, err := modelsInfo.ScanModelsResp()
if err != nil {
return nil
}
hardwareInfo := GetHardwareInfo()
nodeInfoRes := &nodemanagerV2.WorkerMessage{
Message: &nodemanagerV2.WorkerMessage_RegisteMessage{
RegisteMessage: &nodemanagerV2.RegisteMessage{
Info: &nodemanagerV2.NodeInfo{
MinerPubkey: conf.GetConfig().SignPub,
BenefitAddress: conf.GetConfig().BenefitAddress,
},
Info: info,
Hardware: hardwareInfo,
Models: readModels,
Timestamp: nowTimeStamp,
......@@ -342,6 +344,39 @@ func AddModelRunningResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
return addModelRunningRes
}
func RunningModelStatusResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("Add model running response received params:", params)
info := params[0].(*models.ModelInfo)
addModelRunningRes := &nodemanagerV2.WorkerMessage{
Message: &nodemanagerV2.WorkerMessage_RunningModelStatus{
RunningModelStatus: &nodemanagerV2.RunningModelStatus{
ModelId: strconv.FormatUint(info.TaskId, 10),
LastWorkTime: info.LastWorkTime,
TotalRunCount: info.TotalRunCount,
ExecTime: info.EstimatExeTime,
},
},
}
log.Info("---------------------------------------Send Add model running response msg ------------------------------------")
return addModelRunningRes
}
func InstallModelStatusResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("Add model running response received params:", params)
modelId := params[0].(uint64)
lastRunTime := params[1].(int64)
installModelStatusRes := &nodemanagerV2.WorkerMessage{
Message: &nodemanagerV2.WorkerMessage_InstalledModelStatus{
InstalledModelStatus: &nodemanagerV2.InstalledModelStatus{
ModelId: strconv.FormatUint(modelId, 10),
LastRunTime: lastRunTime,
},
},
}
log.Info("---------------------------------------Send install model status response msg ------------------------------------")
return installModelStatusRes
}
func DelModelRunningResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("Del model running response received params:", params)
delModelRunningRes := &nodemanagerV2.WorkerMessage{
......@@ -356,7 +391,7 @@ func DelModelRunningResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
}
func GetHardwareInfo() *nodemanagerV2.HardwareInfo {
hardwareInfo := utils.GetHardwareInfo(conf.GetConfig().HardwareUrl)
hardwareInfo := utils.GetApiHardwareInfo(conf.GetConfig().HardwareUrl)
if hardwareInfo == nil {
return nil
}
......
......@@ -36,14 +36,14 @@ func StartMonitor() {
go modelHandler.MonitorModelInfo()
log.WithField("func", "MonitorModelInfo").Info("--------------------Start modelHandler--------------------")
go modelHandler.MonitorModelStatus()
//go modelHandler.MonitorModelStatus()
log.WithField("func", "MonitorModelStatus").Info("--------------------Start modelHandler--------------------")
go monitorNm.monitorNodeManagerSeed()
log.WithField("func", "monitorNodeManagerSeed").Info("--------------------Start monitorNm--------------------")
for !monitorNm.IsInit && !modelHandler.IsInit {
time.Sleep(time.Second)
time.Sleep(time.Second * 3)
}
go monitorNm.monitorNmClient()
......
......@@ -134,9 +134,9 @@ func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodemanagerV2.PushTaskMessage)
taskCmd: &models.TaskCmd{},
taskExecResult: &models.TaskResult{
TaskHttpStatusCode: 200,
TaskRespBody: nil,
TaskRespBody: []byte{1, 2, 3, 4, 5},
TaskHttpHeaders: nil,
TaskIsSuccess: false,
TaskIsSuccess: true,
TaskExecTime: 0,
TaskExecError: "",
},
......@@ -178,30 +178,38 @@ func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodemanagerV2.PushTaskMessage)
t.ExecTaskIdIsFinished.Store(taskMsg.TaskId, true)
return
}
running, _ := t.foundImageIsRunning(taskOp.taskCmd.ImageName)
if !running {
taskOp.taskCmd.DockerCmd.HostIp = models.ZeroHost
taskOp.taskCmd.DockerCmd.HostPort = t.getExternalPort()
containerId, gpuSeq, err := t.DockerOp.CreateAndStartContainer(model, taskOp.taskCmd.DockerCmd)
if err != nil {
log.Errorf("Create and start container failed: %s", err.Error())
taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Create and start container failed", err.Error())
t.ExecTaskIdIsFinished.Store(taskMsg.TaskId, true)
return
}
model.GpuSeq = gpuSeq
log.Infof("Started container with ID %s", containerId)
}
if err = taskOp.waitContainerRunning(t, taskOp.taskCmd.ImageName, uint16(taskOp.taskCmd.DockerCmd.ContainerPort)); err != nil {
taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s", err.Error())
t.ExecTaskIdIsFinished.Store(taskMsg.TaskId, true)
return
}
if err = taskOp.waitReqContainerOk(t.DockerOp); err != nil {
taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s", err.Error())
t.ExecTaskIdIsFinished.Store(taskMsg.TaskId, true)
return
}
time.Sleep(time.Second * 20)
//running, _ := t.foundImageIsRunning(taskOp.taskCmd.ImageName)
//if !running {
// taskOp.taskCmd.DockerCmd.HostIp = models.ZeroHost
// taskOp.taskCmd.DockerCmd.HostPort = t.getExternalPort()
// info := GetHardwareInfo()
// if info == nil {
// log.Error("Error getting hardware info")
// taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s", "Error getting hardware info")
// t.ExecTaskIdIsFinished.Store(taskMsg.TaskId, true)
// return
// }
// containerId, gpuSeq, err := t.DockerOp.CreateAndStartContainer(info, model, taskOp.taskCmd.DockerCmd)
// if err != nil {
// log.Errorf("Create and start container failed: %s", err.Error())
// taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Create and start container failed", err.Error())
// t.ExecTaskIdIsFinished.Store(taskMsg.TaskId, true)
// return
// }
// model.GpuSeq = gpuSeq
// log.Info("Started container with ID:", containerId)
//}
//if err = taskOp.waitContainerRunning(t, taskOp.taskCmd.ImageName, uint16(taskOp.taskCmd.DockerCmd.ContainerPort)); err != nil {
// taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s", err.Error())
// t.ExecTaskIdIsFinished.Store(taskMsg.TaskId, true)
// return
//}
//if err = taskOp.waitReqContainerOk(t.DockerOp); err != nil {
// taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s", err.Error())
// t.ExecTaskIdIsFinished.Store(taskMsg.TaskId, true)
// return
//}
endAfterTaskTime := time.Since(taskOp.startBeforeTaskTime)
taskOp.taskExecResult.TaskExecTime = endAfterTaskTime.Microseconds()
log.WithField("time", endAfterTaskTime.Seconds()).WithField("taskId", taskMsg.TaskId).Info("Exec task end (second is units) :")
......@@ -217,7 +225,7 @@ func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodemanagerV2.PushTaskMessage)
log.Info("----------------------Compute task exec done--------------------------------")
}
func (t *TaskWorker) GetAckResp(taskMsg *nodemanagerV2.PushTaskMessage) (isCanExecute bool, bootUpTime, queueWaitTime, executeTime int64) {
func (t *TaskWorker) GetAckResp(taskMsg *nodemanagerV2.PushTaskMessage) (isCanExecute bool, bootUpTime, queueWaitTime, executeTime int64, imageName string) {
if t.IsExecStandardTask {
isCanExecute = true
return
......@@ -237,7 +245,7 @@ func (t *TaskWorker) GetAckResp(taskMsg *nodemanagerV2.PushTaskMessage) (isCanEx
if !isSuccess && !t.lastExecTaskStartTime.IsZero() {
lastTaskImageInfo, err := db.GetModel(t.lastExecTaskImageName)
if err != nil {
return false, 0, 0, 0
return false, 0, 0, 0, ""
}
since := time.Since(t.lastExecTaskStartTime)
queueWaitTime = int64(lastTaskImageInfo.EstimatExeTime - int32(since.Seconds()))
......@@ -258,12 +266,13 @@ func (t *TaskWorker) GetAckResp(taskMsg *nodemanagerV2.PushTaskMessage) (isCanEx
isCanExecute = true
modelInfo, err := db.GetModel(t.lastExecTaskImageName)
if err != nil {
return false, 0, 0, 0
return false, 0, 0, 0, ""
}
if modelInfo != nil {
bootUpTime = modelInfo.StartUpTime
executeTime = int64(modelInfo.EstimatExeTime)
}
imageName = modelInfo.ImageName
return
}
......
......@@ -8,7 +8,6 @@ import (
"example.com/m/db"
"example.com/m/log"
"example.com/m/models"
"example.com/m/nm"
"fmt"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
......@@ -42,7 +41,7 @@ func NewDockerOp() *DockerOp {
dockerClient, err := GetDockerClient()
if err != nil {
return &DockerOp{
IsHealthy: false,
IsHealthy: true,
Reason: fmt.Sprintf("The connect docker client failed reason:%s", err.Error()),
}
}
......@@ -125,8 +124,8 @@ func (d *DockerOp) ListContainer() []types.Container {
return containers
}
func (d *DockerOp) CreateAndStartContainer(modelInfo *models.ModelInfo, dockerCmd *models.DockerCmd) (string, int32, error) {
gpuSeq := d.checkGpuUsage(modelInfo, dockerCmd)
func (d *DockerOp) CreateAndStartContainer(info *nodemanagerV2.HardwareInfo, modelInfo *models.ModelInfo, dockerCmd *models.DockerCmd) (string, int32, error) {
gpuSeq := d.checkGpuUsage(info, modelInfo, dockerCmd)
containerId, err := d.CreateContainer(modelInfo.ImageName, dockerCmd)
if err != nil {
log.Error("Error creating container image failed: ", err)
......@@ -391,11 +390,7 @@ func (d *DockerOp) getContainerInfo(id string) (types.Container, error) {
return types.Container{}, fmt.Errorf("get container info failed")
}
func (d *DockerOp) checkGpuUsage(modelInfo *models.ModelInfo, dockerCmd *models.DockerCmd) int32 {
info := nm.GetHardwareInfo()
if info == nil {
return 0
}
func (d *DockerOp) checkGpuUsage(info *nodemanagerV2.HardwareInfo, modelInfo *models.ModelInfo, dockerCmd *models.DockerCmd) int32 {
envMap := make(map[string]string, 0)
gpu := info.GPU
isMatch := false
......@@ -421,7 +416,6 @@ func (d *DockerOp) checkGpuUsage(modelInfo *models.ModelInfo, dockerCmd *models.
}
}
if isMatch {
nm.ModelRunningBeforeMem[modelInfo.ImageName] = dockerCmd.RunningBeforeMem
gpuSeq, _ := strconv.ParseInt(dockerCmd.EnvMap[models.CudaEnv], 10, 32)
return int32(gpuSeq)
}
......
......@@ -310,7 +310,7 @@ func readAndDecryptFile(key []byte, filename string) ([]byte, error) {
return decryptedData, nil
}
func GetHardwareInfo(url string) *models.HardwareInfoRep {
func GetApiHardwareInfo(url string) *models.HardwareInfoRep {
resp, err := http.Get(url)
if err != nil {
log.Error("Error creating request")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment