Commit ea64d5cc authored by duanjinfei's avatar duanjinfei

add mutex

parent 92805fcd
...@@ -42,6 +42,7 @@ func monitorNodeManagerSeed() { ...@@ -42,6 +42,7 @@ func monitorNodeManagerSeed() {
} }
for _, node := range list.GetManagers() { for _, node := range list.GetManagers() {
if isExistNodeManager(node) { if isExistNodeManager(node) {
log.Warn("Node manager is already exist and updated")
continue continue
} }
nodeManagerArr = append(nodeManagerArr, &NodeManager{Info: node, IsUsed: false, IsExist: true}) nodeManagerArr = append(nodeManagerArr, &NodeManager{Info: node, IsUsed: false, IsExist: true})
......
...@@ -285,7 +285,9 @@ func handlerMsg(nodeManager *models.NodeManagerClient, ...@@ -285,7 +285,9 @@ func handlerMsg(nodeManager *models.NodeManagerClient,
taskMsgWorker.Wg.Add(1) taskMsgWorker.Wg.Add(1)
taskMsgWorker.TaskMsg <- taskMsg taskMsgWorker.TaskMsg <- taskMsg
taskMsgWorker.Wg.Wait() taskMsgWorker.Wg.Wait()
taskMsgWorker.Mutex.Lock()
taskExecResInterface, _ := taskMsgWorker.LruCache.Get(taskMsg.TaskId) taskExecResInterface, _ := taskMsgWorker.LruCache.Get(taskMsg.TaskId)
taskMsgWorker.Mutex.Unlock()
//log.WithField("result", taskExecResInterface).Info("lru cache get task result") //log.WithField("result", taskExecResInterface).Info("lru cache get task result")
taskExecRes := &models.TaskResult{ taskExecRes := &models.TaskResult{
TaskHttpStatusCode: 200, TaskHttpStatusCode: 200,
...@@ -305,11 +307,13 @@ func handlerMsg(nodeManager *models.NodeManagerClient, ...@@ -305,11 +307,13 @@ func handlerMsg(nodeManager *models.NodeManagerClient,
} }
reqHash, respHash, minerSign := taskMsgWorker.GetMinerSign(taskMsg, taskExecRes.TaskRespBody) reqHash, respHash, minerSign := taskMsgWorker.GetMinerSign(taskMsg, taskExecRes.TaskRespBody)
params := buildParams(taskMsg.TaskId, containerSign, minerSign, taskExecRes, isSuccess) params := buildParams(taskMsg.TaskId, containerSign, minerSign, taskExecRes, isSuccess)
taskMsgWorker.Mutex.Lock()
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.TaskType, taskMsg.TaskType) taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.TaskType, taskMsg.TaskType)
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.ContainerSign, containerSign) taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.ContainerSign, containerSign)
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.MinerSign, minerSign) taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.MinerSign, minerSign)
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.ReqHash, reqHash) taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.ReqHash, reqHash)
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.RespHash, respHash) taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.RespHash, respHash)
taskMsgWorker.Mutex.Unlock()
msgRespWorker.RegisterMsgResp(nodeManager, worker, SubmitResultResp, params) msgRespWorker.RegisterMsgResp(nodeManager, worker, SubmitResultResp, params)
log.Info("--------------taskMsg--------------:", taskMsg) log.Info("--------------taskMsg--------------:", taskMsg)
}(msgRespWorker, taskMsgWorker, taskMsg) }(msgRespWorker, taskMsgWorker, taskMsg)
......
...@@ -27,6 +27,7 @@ import ( ...@@ -27,6 +27,7 @@ import (
type TaskHandler struct { type TaskHandler struct {
Wg *sync.WaitGroup Wg *sync.WaitGroup
Mutex *sync.Mutex
LruCache *lru.Cache LruCache *lru.Cache
DockerOp *operate.DockerOp DockerOp *operate.DockerOp
CmdOp *operate.Command CmdOp *operate.Command
...@@ -41,6 +42,7 @@ var oldTaskImageName string ...@@ -41,6 +42,7 @@ var oldTaskImageName string
func NewTaskWorker(op *operate.DockerOp) *TaskHandler { func NewTaskWorker(op *operate.DockerOp) *TaskHandler {
return &TaskHandler{ return &TaskHandler{
Wg: &sync.WaitGroup{}, Wg: &sync.WaitGroup{},
Mutex: &sync.Mutex{},
LruCache: lru.New(100), LruCache: lru.New(100),
DockerOp: op, DockerOp: op,
TaskMsg: make(chan *nodeManagerV1.PushTaskMessage, 0), TaskMsg: make(chan *nodeManagerV1.PushTaskMessage, 0),
...@@ -288,7 +290,9 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) ...@@ -288,7 +290,9 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
} else if taskMsg.TaskKind == baseV1.TaskKind_StandardTask { } else if taskMsg.TaskKind == baseV1.TaskKind_StandardTask {
t.IsExecStandardTask = false t.IsExecStandardTask = false
} }
t.Mutex.Lock()
t.LruCache.Add(taskMsg.TaskId, taskExecResult) t.LruCache.Add(taskMsg.TaskId, taskExecResult)
t.Mutex.Unlock()
//log.WithField("result", taskExecResult).Info("lru cache storage task result") //log.WithField("result", taskExecResult).Info("lru cache storage task result")
log.Info("received computeTask--------------------------------") log.Info("received computeTask--------------------------------")
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment