Commit cbda927b authored by duanjinfei's avatar duanjinfei

update task info

parent 5f4d56dc
......@@ -5,6 +5,7 @@ const (
Chat = "chat"
Picture = "picture"
Language = "language"
TaskType = "taskType"
ContainerSign = "container"
MinerSign = "miner"
ReqHash = "reqHash"
......
......@@ -197,7 +197,7 @@ func SubmitResultResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
submitResultMsgRes := &nodemanagerV1.WorkerMessage{
Message: &nodemanagerV1.WorkerMessage_SubmitTaskResult{
SubmitTaskResult: &nodemanagerV1.SubmitTaskResult{
TaskUuid: taskId,
TaskId: taskId,
ContainerSignature: containerSign,
MinerSignature: minerSign,
TaskResultHeader: taskResultHeader,
......@@ -210,3 +210,16 @@ func SubmitResultResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
log.Info("---------------------------------------Send task result msg ------------------------------------")
return submitResultMsgRes
}
func FetchStandardTaskResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
//log.Info("Handler task submit result resp received params:", params)
fetchStandardTaskMsgRes := &nodemanagerV1.WorkerMessage{
Message: &nodemanagerV1.WorkerMessage_FetchStandardTask{
FetchStandardTask: &nodemanagerV1.FetchStandardTask{
TaskType: 11,
},
},
}
log.Info("---------------------------------------Send fetch standard task msg ------------------------------------")
return fetchStandardTaskMsgRes
}
......@@ -144,6 +144,8 @@ func monitorWorker(op *operate.DockerOp) {
go proofWorker.CommitWitness()
log.Info("Proof commit worker started")
//go handlerStandardTask(nodeManager, worker, msgRespWorker, taskMsgWorker)
// 处理消息
for i := 0; i < 5; i++ {
go handlerMsg(nodeManager, worker, msgRespWorker, taskMsgWorker, proofWorker)
......@@ -183,6 +185,26 @@ func monitorWorker(op *operate.DockerOp) {
}
}
func handlerStandardTask(nodeManager *models.NodeManagerClient,
worker nodeManagerV1.NodeManagerService_RegisterWorkerClient,
msgRespWorker *RespMsgWorker,
taskMsgWorker *TaskHandler) {
for {
for taskMsgWorker.IsExecStandardTask {
}
noExecAiTaskTime := time.Now()
for !taskMsgWorker.IsExecAiTask {
since := time.Since(noExecAiTaskTime)
if since.Seconds() == 5 {
msgRespWorker.RegisterMsgResp(nodeManager, worker, FetchStandardTaskResp, nil)
taskMsgWorker.IsExecStandardTask = true
noExecAiTaskTime = time.Now()
break
}
}
}
}
// handlerMsg 通过 goroutine 处理Msg
func handlerMsg(nodeManager *models.NodeManagerClient,
worker nodeManagerV1.NodeManagerService_RegisterWorkerClient,
......@@ -217,21 +239,22 @@ func handlerMsg(nodeManager *models.NodeManagerClient,
taskMsgWorker.Wg.Add(1)
taskMsgWorker.TaskMsg <- taskMsg
taskMsgWorker.Wg.Wait()
taskResHeader := taskMsgWorker.TaskRespHeader[taskMsg.TaskUuid]
taskResBody := taskMsgWorker.TaskRespBody[taskMsg.TaskUuid]
taskResExecTime := taskMsgWorker.TaskExecTime[taskMsg.TaskUuid]
isSuccess := taskMsgWorker.TaskIsSuccess[taskMsg.TaskUuid]
taskResHeader := taskMsgWorker.TaskRespHeader[taskMsg.TaskId]
taskResBody := taskMsgWorker.TaskRespBody[taskMsg.TaskId]
taskResExecTime := taskMsgWorker.TaskExecTime[taskMsg.TaskId]
isSuccess := taskMsgWorker.TaskIsSuccess[taskMsg.TaskId]
containerSign := taskMsgWorker.DockerOp.GetContainerSign(taskMsg, taskResBody)
if containerSign == nil || len(containerSign) == 0 {
log.Error("Container signing failed................")
isSuccess = false
}
reqHash, respHash, minerSign := taskMsgWorker.GetMinerSign(taskMsg, taskResBody)
params := buildParams(taskMsg.TaskUuid, containerSign, minerSign, taskResHeader, taskResBody, taskResExecTime, isSuccess)
taskMsgWorker.LruCache.Add(taskMsg.TaskUuid+models.ContainerSign, containerSign)
taskMsgWorker.LruCache.Add(taskMsg.TaskUuid+models.MinerSign, minerSign)
taskMsgWorker.LruCache.Add(taskMsg.TaskUuid+models.ReqHash, reqHash)
taskMsgWorker.LruCache.Add(taskMsg.TaskUuid+models.RespHash, respHash)
params := buildParams(taskMsg.TaskId, containerSign, minerSign, taskResHeader, taskResBody, taskResExecTime, isSuccess)
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.TaskType, taskMsg.TaskType)
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.ContainerSign, containerSign)
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.MinerSign, minerSign)
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.ReqHash, reqHash)
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.RespHash, respHash)
msgRespWorker.RegisterMsgResp(nodeManager, worker, SubmitResultResp, params)
log.Info("--------------taskMsg--------------:", taskMsg)
}(msgRespWorker, taskMsgWorker, taskMsg)
......@@ -240,23 +263,24 @@ func handlerMsg(nodeManager *models.NodeManagerClient,
nmSignMsg := rev.GetProofTaskResult()
if nmSignMsg != nil {
containerSign, _ := taskMsgWorker.LruCache.Get(nmSignMsg.TaskUuid + models.ContainerSign)
containerSign, _ := taskMsgWorker.LruCache.Get(nmSignMsg.TaskId + models.ContainerSign)
//if !ok {
// log.Error("taskMsgWorker.LruCache.Get failed: ", nmSignMsg.TaskUuid+models.ContainerSign)
//}
minerSign, _ := taskMsgWorker.LruCache.Get(nmSignMsg.TaskUuid + models.MinerSign)
minerSign, _ := taskMsgWorker.LruCache.Get(nmSignMsg.TaskId + models.MinerSign)
//if !ok {
// log.Error("taskMsgWorker.LruCache.Get failed: ", nmSignMsg.TaskUuid+models.MinerSign)
//}
reqHash, _ := taskMsgWorker.LruCache.Get(nmSignMsg.TaskUuid + models.ReqHash)
reqHash, _ := taskMsgWorker.LruCache.Get(nmSignMsg.TaskId + models.ReqHash)
//if !ok {
// log.Error("taskMsgWorker.LruCache.Get failed: ", nmSignMsg.TaskUuid+models.ReqHash)
//}
respHash, _ := taskMsgWorker.LruCache.Get(nmSignMsg.TaskUuid + models.RespHash)
respHash, _ := taskMsgWorker.LruCache.Get(nmSignMsg.TaskId + models.RespHash)
//if !ok {
// log.Error("taskMsgWorker.LruCache.Get failed: ", nmSignMsg.TaskUuid+models.RespHash)
//}
proofWorker.ProductProof(nmSignMsg.TaskUuid, nmSignMsg.Workload, reqHash.([]byte), respHash.([]byte), containerSign.([]byte), minerSign.([]byte), nmSignMsg.ManagerSignature)
taskType, _ := taskMsgWorker.LruCache.Get(nmSignMsg.TaskId + models.TaskType)
proofWorker.ProductProof(nmSignMsg.TaskId, nmSignMsg.Workload, taskType.(uint64), reqHash.([]byte), respHash.([]byte), containerSign.([]byte), minerSign.([]byte), nmSignMsg.ManagerSignature)
log.Info(nmSignMsg)
continue
}
......
......@@ -22,16 +22,18 @@ import (
)
type TaskHandler struct {
Wg *sync.WaitGroup
LruCache *lru.Cache
DockerOp *operate.DockerOp
CmdOp *operate.Command
TaskMsg chan *nodeManagerV1.PushTaskMessage
TaskRespHeader map[string][]byte
TaskExecTime map[string]int64
TaskRespBody map[string][]byte
TaskIsSuccess map[string]bool
HttpClient *http.Client
Wg *sync.WaitGroup
LruCache *lru.Cache
DockerOp *operate.DockerOp
CmdOp *operate.Command
TaskMsg chan *nodeManagerV1.PushTaskMessage
TaskRespHeader map[string][]byte
TaskExecTime map[string]int64
TaskRespBody map[string][]byte
TaskIsSuccess map[string]bool
HttpClient *http.Client
IsExecAiTask bool
IsExecStandardTask bool
}
func NewTaskWorker(op *operate.DockerOp) *TaskHandler {
......@@ -45,6 +47,7 @@ func NewTaskWorker(op *operate.DockerOp) *TaskHandler {
TaskRespBody: make(map[string][]byte, 0),
TaskIsSuccess: make(map[string]bool, 0),
HttpClient: &http.Client{},
IsExecAiTask: false,
}
}
......@@ -55,20 +58,24 @@ func (t *TaskHandler) HandlerTask(runCount int) {
select {
case taskMsg := <-t.TaskMsg:
{
switch taskMsg.TaskType {
case baseV1.TaskType_SystemTask:
switch taskMsg.TaskKind {
case baseV1.TaskKind_SystemTask:
{
//command := operate.GetSystemCommand(taskMsg.TaskCmd, taskMsg.TaskParam, taskMsg.TaskUuid+".sh")
//command := operate.GetSystemCommand(taskMsg.TaskCmd, taskMsg.TaskParam, taskMsg.TaskId+".sh")
t.SystemTaskHandler(taskMsg)
}
case baseV1.TaskType_ComputeTask:
case baseV1.TaskKind_ComputeTask:
{
t.ComputeTaskHandler(taskMsg)
}
case baseV1.TaskType_CustomTask:
case baseV1.TaskKind_CustomTask:
{
t.CustomTaskHandler(taskMsg)
}
case baseV1.TaskKind_StandardTask:
{
t.StandardTaskHandler(taskMsg)
}
}
}
}
......@@ -85,10 +92,14 @@ func (t *TaskHandler) SystemTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) {
defer t.Wg.Done()
t.TaskRespBody[taskMsg.TaskUuid] = nil
t.TaskRespHeader[taskMsg.TaskUuid] = nil
t.TaskExecTime[taskMsg.TaskUuid] = 0
t.TaskIsSuccess[taskMsg.TaskUuid] = false
if t.IsExecStandardTask {
//todo: 停止标准任务容器
}
t.IsExecAiTask = true
t.TaskRespBody[taskMsg.TaskId] = nil
t.TaskRespHeader[taskMsg.TaskId] = nil
t.TaskExecTime[taskMsg.TaskId] = 0
t.TaskIsSuccess[taskMsg.TaskId] = false
reader := bytes.NewReader(taskMsg.TaskParam)
taskCmd := &models.TaskCmd{}
err := json.Unmarshal(bytes.NewBufferString(taskMsg.TaskCmd).Bytes(), taskCmd)
......@@ -187,14 +198,27 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
log.Error("received error: ", err)
return
}
t.TaskRespHeader[taskMsg.TaskUuid] = headers
t.TaskRespBody[taskMsg.TaskUuid] = readBody
t.TaskIsSuccess[taskMsg.TaskUuid] = true
t.TaskExecTime[taskMsg.TaskUuid] = endAfterTaskTime.Microseconds()
t.TaskRespHeader[taskMsg.TaskId] = headers
t.TaskRespBody[taskMsg.TaskId] = readBody
t.TaskIsSuccess[taskMsg.TaskId] = true
t.TaskExecTime[taskMsg.TaskId] = endAfterTaskTime.Microseconds()
}
t.IsExecAiTask = false
log.Info("received computeTask--------------------------------")
}
func (t *TaskHandler) StandardTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) {
defer t.Wg.Done()
t.TaskRespBody[taskMsg.TaskId] = nil
t.TaskRespHeader[taskMsg.TaskId] = nil
t.TaskExecTime[taskMsg.TaskId] = 0
t.TaskIsSuccess[taskMsg.TaskId] = false
//todo: 执行标准任务
t.IsExecStandardTask = false
log.Info("received customTask--------------------------------")
}
func (t *TaskHandler) CustomTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) {
defer t.Wg.Done()
_, err := t.DockerOp.PsImages()
......@@ -208,7 +232,7 @@ func (t *TaskHandler) CustomTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
func (t *TaskHandler) GetMinerSign(msg *nodeManagerV1.PushTaskMessage, taskResult []byte) ([]byte, []byte, []byte) {
reqHash := crypto.Keccak256Hash(msg.TaskParam)
respHash := crypto.Keccak256Hash(taskResult)
signHash := crypto.Keccak256Hash(bytes.NewBufferString(msg.TaskUuid).Bytes(), reqHash.Bytes(), respHash.Bytes())
signHash := crypto.Keccak256Hash(bytes.NewBufferString(msg.TaskId).Bytes(), reqHash.Bytes(), respHash.Bytes())
sign, err := crypto.Sign(signHash.Bytes(), conf.GetConfig().SignPrivateKey)
if err != nil {
log.Error("custom task handler")
......
......@@ -58,7 +58,7 @@ func NewDockerOp() *DockerOp {
func (d *DockerOp) GetContainerSign(taskMsg *nodemanagerv1.PushTaskMessage, taskRes []byte) []byte {
reqBody := &models.TaskReq{
TaskId: taskMsg.TaskUuid,
TaskId: taskMsg.TaskId,
TaskParam: taskMsg.TaskParam,
TaskResult: taskRes,
}
......
......@@ -27,11 +27,12 @@ func NewProofWorker() *ProofWorker {
}
}
func (p *ProofWorker) ProductProof(taskId string, workLoad uint64, reqHash []byte, respHash []byte, containerSign, minerSign, nmSign []byte) {
func (p *ProofWorker) ProductProof(taskId string, workLoad, taskType uint64, reqHash []byte, respHash []byte, containerSign, minerSign, nmSign []byte) {
log.Info("ProductProof received workLoad:", workLoad)
p.productProofChan <- &witnessV1.Proof{
Workload: workLoad,
TaskId: taskId,
TaskType: taskType,
ReqHash: reqHash,
RespHash: respHash,
ContainerSignature: containerSign,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment