msg_handler.go 7.42 KB
Newer Older
duanjinfei's avatar
duanjinfei committed
1 2 3 4 5 6
package nm

import (
	"example.com/m/conf"
	"example.com/m/log"
	"example.com/m/models"
7
	"example.com/m/operate"
duanjinfei's avatar
duanjinfei committed
8 9 10 11 12 13 14 15 16 17 18
	"example.com/m/utils"
	"example.com/m/validator"
	"fmt"
	nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
	"time"
)

type NodeManagerHandler struct {
	nodeManager   *models.NodeManagerClient
	worker        nodeManagerV1.NodeManagerService_RegisterWorkerClient
	msgRespWorker *RespMsgWorker
duanjinfei's avatar
duanjinfei committed
19
	taskMsgWorker *TaskWorker
duanjinfei's avatar
duanjinfei committed
20 21
}

duanjinfei's avatar
duanjinfei committed
22
func NewNodeManagerHandler(nodeManager *models.NodeManagerClient, worker nodeManagerV1.NodeManagerService_RegisterWorkerClient, msgRespWorker *RespMsgWorker, taskMsgWorker *TaskWorker) *NodeManagerHandler {
duanjinfei's avatar
duanjinfei committed
23 24 25 26 27 28 29 30
	return &NodeManagerHandler{
		nodeManager:   nodeManager,
		worker:        worker,
		msgRespWorker: msgRespWorker,
		taskMsgWorker: taskMsgWorker,
	}
}

duanjinfei's avatar
duanjinfei committed
31
func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *nodeManagerV1.ManagerMessage, proofWorker *validator.ProofWorker) {
duanjinfei's avatar
duanjinfei committed
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
	for {
		select {
		case rev := <-nodeManagerMsgChan:
			{
				if !n.nodeManager.Status {
					log.Warn("handlerMsg -> node manager is not running")
					return
				}
				heartbeatReq := rev.GetHeartbeatRequest()
				if heartbeatReq != nil {
					n.nodeManager.UpdateLastHeartTime(time.Now())
					params := utils.BuildParams(heartbeatReq.Timestamp)
					n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, HeartbeatResp, params)
					log.Info("-------------Heart beat req:-------------", heartbeatReq)
					continue
				}

				taskMsg := rev.GetPushTaskMessage()
				if taskMsg != nil {
duanjinfei's avatar
duanjinfei committed
51 52 53 54
					go func(msgRespWorker *RespMsgWorker, taskMsgWorker *TaskWorker, taskMsg *nodeManagerV1.PushTaskMessage) {
						isCanExecute, bootUpTime, queueWaitTime, executeTime := taskMsgWorker.GetAckResp(taskMsg)
						ackParams := utils.BuildParams(taskMsg.TaskId, isCanExecute, bootUpTime, queueWaitTime, executeTime)
						msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, RespTaskAck, ackParams)
55
						if !isCanExecute {
duanjinfei's avatar
duanjinfei committed
56
							log.WithField("taskId", taskMsg.TaskId).Error("The task is not executed")
57 58
							return
						}
duanjinfei's avatar
duanjinfei committed
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
						taskMsgWorker.Wg.Add(1)
						taskMsgWorker.TaskMsg <- taskMsg
						taskMsgWorker.Wg.Wait()
						taskExecResInterface, _ := taskMsgWorker.LruCache.Get(taskMsg.TaskId)
						//log.WithField("result", taskExecResInterface).Info("lru cache get task result")
						taskExecRes := &models.TaskResult{
							TaskHttpStatusCode: 200,
							TaskRespBody:       nil,
							TaskHttpHeaders:    nil,
							TaskIsSuccess:      false,
							TaskExecTime:       0,
							TaskExecError:      "",
						}
						if taskExecResInterface != nil {
							taskExecRes = taskExecResInterface.(*models.TaskResult)
						}
						isSuccess := taskExecRes.TaskIsSuccess
						containerSign := make([]byte, 0)
						if taskExecRes.TaskRespBody != nil {
							containerSign = taskMsgWorker.DockerOp.GetContainerSign(taskMsg, taskExecRes.TaskRespBody)
							if containerSign == nil || len(containerSign) == 0 {
								log.Error("Container signing failed................")
								isSuccess = false
								taskExecRes.TaskExecError = fmt.Sprintf("%s-%s", "Container sign failed", taskExecRes.TaskExecError)
							}
						} else {
							isSuccess = false
							taskExecRes.TaskExecError = fmt.Sprintf("worker:%s-%s-%s", conf.GetConfig().SignPublicAddress.Hex(), "Task exec error", taskExecRes.TaskExecError)
						}
88
						_, _, minerSign := taskMsgWorker.GetMinerSign(taskMsg, taskExecRes.TaskRespBody)
duanjinfei's avatar
duanjinfei committed
89
						taskResultParams := utils.BuildParams(taskMsg.TaskId, containerSign, minerSign, taskExecRes, isSuccess)
90 91 92 93 94
						//taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.TaskType, taskMsg.TaskType)
						//taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.ContainerSign, containerSign)
						//taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.MinerSign, minerSign)
						//taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.ReqHash, reqHash)
						//taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.RespHash, respHash)
duanjinfei's avatar
duanjinfei committed
95
						msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, SubmitResultResp, taskResultParams)
duanjinfei's avatar
duanjinfei committed
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
						log.Info("--------------taskMsg--------------:", taskMsg)
					}(n.msgRespWorker, n.taskMsgWorker, taskMsg)
					continue
				}

				nmResultMsg := rev.GetProofTaskResult()
				if nmResultMsg != nil {
					//containerSign, _ := taskMsgWorker.LruCache.Get(nmResultMsg.TaskId + models.ContainerSign)
					//if !ok {
					//	log.Error("taskMsgWorker.LruCache.Get failed: ", nmSignMsg.TaskUuid+models.ContainerSign)
					//}
					//minerSign, _ := taskMsgWorker.LruCache.Get(nmResultMsg.TaskId + models.MinerSign)
					//if !ok {
					//	log.Error("taskMsgWorker.LruCache.Get failed: ", nmSignMsg.TaskUuid+models.MinerSign)
					//}
					//reqHash, _ := taskMsgWorker.LruCache.Get(nmResultMsg.TaskId + models.ReqHash)
					//if !ok {
					//	log.Error("taskMsgWorker.LruCache.Get failed: ", nmSignMsg.TaskUuid+models.ReqHash)
					//}
					//respHash, _ := taskMsgWorker.LruCache.Get(nmResultMsg.TaskId + models.RespHash)
					//if !ok {
					//	log.Error("taskMsgWorker.LruCache.Get failed: ", nmSignMsg.TaskUuid+models.RespHash)
					//}
					//taskType, _ := taskMsgWorker.LruCache.Get(nmResultMsg.TaskId + models.TaskType)
					//proofWorker.ProductProof(nmResultMsg, taskType.(uint64), reqHash.([]byte), respHash.([]byte), containerSign.([]byte), minerSign.([]byte))
					log.WithField("proof", nmResultMsg).Info("Output proof task result")
					continue
				}

				deviceUsageMsg := rev.GetDeviceUsage()
				if deviceUsageMsg != nil {
					n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, DeviceUsageResp, nil)
					log.Info(deviceUsageMsg)
					continue
				}

				nodeInfoMsg := rev.GetNodeInfoRequest()
				if nodeInfoMsg != nil {
					n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, NodeInfoResp, nil)
					log.Info(nodeInfoMsg)
					continue
				}

				statusReqMsg := rev.GetStatusRequest()
				if statusReqMsg != nil {
					n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, StatusResp, nil)
					log.Info(statusReqMsg)
					continue
				}

				goodByeMsg := rev.GetGoodbyeMessage()
				if goodByeMsg != nil {
					reason := goodByeMsg.GetReason()
					log.Infof("Server endpoint:%s , good bye reason : %s", n.nodeManager.Endpoint, reason)
					n.nodeManager.UpdateStatus(false)
					log.Warn("Update nm status is false")
					continue
				}
			}
		}
	}
}

duanjinfei's avatar
duanjinfei committed
159
func (n *NodeManagerHandler) MonitorStandardTaskWorker() {
duanjinfei's avatar
duanjinfei committed
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
	//ticker := time.NewTicker(time.Second * 30)
	ticker := time.NewTicker(time.Minute * 5)
	for {
		select {
		case <-ticker.C:
			{
				if n.taskMsgWorker.IsExecStandardTask {
					continue
				}
				if !n.taskMsgWorker.IsExecAiTask {
					n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, FetchStandardTaskResp, nil)
					break
				}
			}
		}
	}
}
177 178 179 180 181 182

func (n *NodeManagerHandler) ReportResourceMap(dockerOp *operate.DockerOp) {
	ticker := time.NewTicker(time.Second * 1)
	for {
		select {
		case <-ticker.C:
duanjinfei's avatar
duanjinfei committed
183 184 185 186 187 188 189 190 191 192 193 194 195 196
			if len(dockerOp.ReportModelIds) > 0 {
				bootUpModelIds := make([]uint64, 0)
				containers := dockerOp.ListContainer()
				if containers != nil && len(containers) > 0 {
					for _, container := range containers {
						if container.State == "running" {
							taskId := dockerOp.BootUpModelId[container.Image]
							if taskId != 0 {
								bootUpModelIds = append(bootUpModelIds, taskId)
							}
						}
					}
				}
				params := utils.BuildParams(dockerOp.ReportModelIds, bootUpModelIds)
197
				n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, SubmitResourceMapRes, params)
duanjinfei's avatar
duanjinfei committed
198
				ticker.Reset(time.Minute * 10)
199 200 201 202
			}
		}
	}
}