Commit 11fb03c0 authored by duanjinfei's avatar duanjinfei

update msg resp

parent 26212fdb
......@@ -65,6 +65,7 @@ func (c *NodeController) SetBenefitAddress() {
if err != nil {
c.ResponseInfo(500, "Write benefit file failed", "")
}
nm.IsUpdateBenefitAddr = true
}
c.ResponseInfo(200, "set benefit address successful", "")
}
......
......@@ -16,10 +16,12 @@ var (
HistoryBenefitAcc []*models.BenefitAddressStruct
RunningState *models.RunningState
IsRecvTask bool
IsUpdateBenefitAddr bool
)
func init() {
IsRecvTask = false
IsUpdateBenefitAddr = false
HistoryBenefitAcc = make([]*models.BenefitAddressStruct, 0)
RunningState = &models.RunningState{
RunningTime: time.Now().Unix(),
......
......@@ -81,10 +81,15 @@ func (m *MonitorNm) monitorNmClient() {
for {
if !IsRecvTask {
log.Warn("User set recv task status is false")
msgRespWorker.RegisterMsgResp(nodeManager, worker, RegisterInfoResp, nil)
msgRespWorker.RegisterMsgResp(nodeManager, worker, GoodbyeResp, nil)
nodeManager.UpdateStatus(false)
return
}
if IsUpdateBenefitAddr {
benefitAddrUpdateParam := utils.BuildParams(conf.GetConfig().BenefitAddress)
msgRespWorker.RegisterMsgResp(nodeManager, worker, BenefitAddrUpdateResp, benefitAddrUpdateParam)
IsUpdateBenefitAddr = false
}
sub := time.Now().Sub(nodeManager.GetLastHeartTime()).Seconds()
log.WithField("time(uint seconds)", sub).Info("Handler nm msg thread monitor heartbeat time")
rev, err := worker.Recv()
......
......@@ -61,7 +61,7 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node
taskMsg := rev.GetPushTask()
if taskMsg != nil {
go func(msgRespWorker *RespMsgWorker, taskMsgWorker *TaskWorker, taskMsg *nodemanagerV2.PushTaskMessage) {
isCanExecute, bootUpTime, queueWaitTime, executeTime := taskMsgWorker.GetAckResp(taskMsg)
isCanExecute, bootUpTime, queueWaitTime, executeTime, imageName := taskMsgWorker.GetAckResp(taskMsg)
ackParams := utils.BuildParams(taskMsg.TaskId, isCanExecute, bootUpTime, queueWaitTime, executeTime)
msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, RespTaskAck, ackParams)
if !isCanExecute {
......@@ -108,7 +108,12 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node
RunningState.CompletedTaskCount++
log.Info("Completed task count: ", RunningState.CompletedTaskCount)
log.Info("--------------taskMsg--------------:", taskMsg)
msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, GpuUsageResp, ackParams)
msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, GpuUsageResp, nil)
model, _ := db.GetModel(imageName)
if model != nil {
runningModelStatusParam := utils.BuildParams(model)
msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, RunningModelStatusResp, runningModelStatusParam)
}
}(n.msgRespWorker, n.taskMsgWorker, taskMsg)
continue
}
......@@ -175,7 +180,11 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node
HostIp: models.ZeroHost,
HostPort: n.taskMsgWorker.getExternalPort(),
}
containerId, gpuSeq, err := dockerOp.CreateAndStartContainer(model, dockerCmd)
info := GetHardwareInfo()
if info == nil {
continue
}
containerId, gpuSeq, err := dockerOp.CreateAndStartContainer(info, model, dockerCmd)
if err != nil {
log.WithError(err).Error("Error creating container")
continue
......@@ -350,6 +359,8 @@ func (n *NodeManagerHandler) MonitorImageOp(op *nodemanagerV2.ModelOperate) {
model.ContainerId = ""
params := utils.BuildParams(strconv.FormatUint(model.TaskId, 10))
n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, DelModelRunningResp, params)
params = utils.BuildParams(strconv.FormatUint(model.TaskId, 10), model.LastRunTime)
n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, InstallModelStatusResp, params)
break
}
}
......
package nm
import (
"bytes"
"example.com/m/conf"
"example.com/m/largeModel"
"example.com/m/log"
......@@ -11,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
nodemanagerV2 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
"math/big"
"strconv"
"time"
)
......@@ -80,28 +80,30 @@ func HeartbeatResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
func RegisterInfoResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("Register info response received params:", params)
info := &nodemanagerV2.NodeInfo{
MinerPubkey: conf.GetConfig().SignPub,
BenefitAddress: conf.GetConfig().BenefitAddress,
}
hardwareInfo := GetHardwareInfo()
modelsInfo := params[0].(*largeModel.ModelHandler)
readModels, err := modelsInfo.ScanModelsResp()
if err != nil {
log.WithError(err).Error("Error scanning models response failed")
return nil
}
nowTimeStamp := time.Now().Unix()
nowTimeBytes := big.NewInt(nowTimeStamp).Bytes()
signHash := crypto.Keccak256Hash(bytes.NewBufferString(conf.GetConfig().GetExternalIp()).Bytes(),
bytes.NewBufferString(conf.GetConfig().SignPub).Bytes(),
bytes.NewBufferString(conf.GetConfig().BenefitAddress).Bytes(),
signHash := crypto.Keccak256Hash([]byte(info.String()),
[]byte(hardwareInfo.String()),
[]byte(readModels.String()),
nowTimeBytes)
log.WithField("hash", signHash.String()).Info("register message sign result")
sign, _ := crypto.Sign(signHash.Bytes(), conf.GetConfig().SignPrivateKey)
log.Info("register message sign:", common.Bytes2Hex(sign))
modelsInfo := params[0].(*largeModel.ModelHandler)
readModels, err := modelsInfo.ScanModelsResp()
if err != nil {
return nil
}
hardwareInfo := GetHardwareInfo()
nodeInfoRes := &nodemanagerV2.WorkerMessage{
Message: &nodemanagerV2.WorkerMessage_RegisteMessage{
RegisteMessage: &nodemanagerV2.RegisteMessage{
Info: &nodemanagerV2.NodeInfo{
MinerPubkey: conf.GetConfig().SignPub,
BenefitAddress: conf.GetConfig().BenefitAddress,
},
Info: info,
Hardware: hardwareInfo,
Models: readModels,
Timestamp: nowTimeStamp,
......@@ -355,6 +357,53 @@ func DelModelRunningResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
return delModelRunningRes
}
func BenefitAddrUpdateResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("Benefit addr update response received params:", params)
addr := params[0].(string)
benefitAddrUpdateResp := &nodemanagerV2.WorkerMessage{
Message: &nodemanagerV2.WorkerMessage_BenefitAddrUpdate{
BenefitAddrUpdate: &nodemanagerV2.BenefitAddrUpdate{
BenefitAddress: addr,
},
},
}
log.Info("---------------------------------------Send Benefit addr update response msg ------------------------------------")
return benefitAddrUpdateResp
}
func RunningModelStatusResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("Running Model Status response received params:", params)
info := params[0].(*models.ModelInfo)
runningModelStatusResp := &nodemanagerV2.WorkerMessage{
Message: &nodemanagerV2.WorkerMessage_RunningModelStatus{
RunningModelStatus: &nodemanagerV2.RunningModelStatus{
ModelId: strconv.FormatUint(info.TaskId, 10),
LastWorkTime: info.LastWorkTime,
TotalRunCount: info.TotalRunCount,
ExecTime: info.EstimatExeTime,
},
},
}
log.Info("---------------------------------------Send running model status response msg ------------------------------------")
return runningModelStatusResp
}
func InstallModelStatusResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("Install Model Status response received params:", params)
modelId := params[0].(string)
lastRunTime := params[1].(int64)
installModelStatusRes := &nodemanagerV2.WorkerMessage{
Message: &nodemanagerV2.WorkerMessage_InstalledModelStatus{
InstalledModelStatus: &nodemanagerV2.InstalledModelStatus{
ModelId: modelId,
LastRunTime: lastRunTime,
},
},
}
log.Info("---------------------------------------Send install model status response msg ------------------------------------")
return installModelStatusRes
}
func GetHardwareInfo() *nodemanagerV2.HardwareInfo {
hardwareInfo := utils.GetHardwareInfo(conf.GetConfig().HardwareUrl)
if hardwareInfo == nil {
......
......@@ -182,7 +182,14 @@ func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodemanagerV2.PushTaskMessage)
if !running {
taskOp.taskCmd.DockerCmd.HostIp = models.ZeroHost
taskOp.taskCmd.DockerCmd.HostPort = t.getExternalPort()
containerId, gpuSeq, err := t.DockerOp.CreateAndStartContainer(model, taskOp.taskCmd.DockerCmd)
info := GetHardwareInfo()
if info == nil {
log.Error("Error getting hardware info")
taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s", "Error getting hardware info")
t.ExecTaskIdIsFinished.Store(taskMsg.TaskId, true)
return
}
containerId, gpuSeq, err := t.DockerOp.CreateAndStartContainer(info, model, taskOp.taskCmd.DockerCmd)
if err != nil {
log.Errorf("Create and start container failed: %s", err.Error())
taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Create and start container failed", err.Error())
......@@ -217,7 +224,7 @@ func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodemanagerV2.PushTaskMessage)
log.Info("----------------------Compute task exec done--------------------------------")
}
func (t *TaskWorker) GetAckResp(taskMsg *nodemanagerV2.PushTaskMessage) (isCanExecute bool, bootUpTime, queueWaitTime, executeTime int64) {
func (t *TaskWorker) GetAckResp(taskMsg *nodemanagerV2.PushTaskMessage) (isCanExecute bool, bootUpTime, queueWaitTime, executeTime int64, imageName string) {
if t.IsExecStandardTask {
isCanExecute = true
return
......@@ -237,7 +244,7 @@ func (t *TaskWorker) GetAckResp(taskMsg *nodemanagerV2.PushTaskMessage) (isCanEx
if !isSuccess && !t.lastExecTaskStartTime.IsZero() {
lastTaskImageInfo, err := db.GetModel(t.lastExecTaskImageName)
if err != nil {
return false, 0, 0, 0
return false, 0, 0, 0, ""
}
since := time.Since(t.lastExecTaskStartTime)
queueWaitTime = int64(lastTaskImageInfo.EstimatExeTime - int32(since.Seconds()))
......@@ -258,12 +265,13 @@ func (t *TaskWorker) GetAckResp(taskMsg *nodemanagerV2.PushTaskMessage) (isCanEx
isCanExecute = true
modelInfo, err := db.GetModel(t.lastExecTaskImageName)
if err != nil {
return false, 0, 0, 0
return false, 0, 0, 0, ""
}
if modelInfo != nil {
bootUpTime = modelInfo.StartUpTime
executeTime = int64(modelInfo.EstimatExeTime)
}
imageName = modelInfo.ImageName
return
}
......
......@@ -125,8 +125,8 @@ func (d *DockerOp) ListContainer() []types.Container {
return containers
}
func (d *DockerOp) CreateAndStartContainer(modelInfo *models.ModelInfo, dockerCmd *models.DockerCmd) (string, int32, error) {
gpuSeq := d.checkGpuUsage(modelInfo, dockerCmd)
func (d *DockerOp) CreateAndStartContainer(info *nodemanagerV2.HardwareInfo, modelInfo *models.ModelInfo, dockerCmd *models.DockerCmd) (string, int32, error) {
gpuSeq := d.checkGpuUsage(info, modelInfo, dockerCmd)
containerId, err := d.CreateContainer(modelInfo.ImageName, dockerCmd)
if err != nil {
log.Error("Error creating container image failed: ", err)
......@@ -391,11 +391,7 @@ func (d *DockerOp) getContainerInfo(id string) (types.Container, error) {
return types.Container{}, fmt.Errorf("get container info failed")
}
func (d *DockerOp) checkGpuUsage(modelInfo *models.ModelInfo, dockerCmd *models.DockerCmd) int32 {
info := nm.GetHardwareInfo()
if info == nil {
return 0
}
func (d *DockerOp) checkGpuUsage(info *nodemanagerV2.HardwareInfo, modelInfo *models.ModelInfo, dockerCmd *models.DockerCmd) int32 {
envMap := make(map[string]string, 0)
gpu := info.GPU
isMatch := false
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment