Commit 195d6923 authored by duanjinfei's avatar duanjinfei

update func name

parent 6aa1d584
......@@ -44,14 +44,12 @@ func (m *MonitorNm) monitorNmClient() {
}
msgRespWorker := NewMsgRespWorker()
for i := 0; i < 2; i++ {
go msgRespWorker.SendMsg()
}
go msgRespWorker.SendMsgWorker()
log.Info("Send msg worker started.......................")
taskMsgWorker := NewTaskWorker(m.DockerOp)
taskMsgWorker.HandlerTask(4)
proofWorker := validator.NewProofWorker()
taskMsgWorker.DistributionTaskWorker(4)
log.Info("Distribution task worker started.......................")
go func(dockerOp *operate.DockerOp) {
for {
......@@ -69,13 +67,20 @@ func (m *MonitorNm) monitorNmClient() {
msgRespWorker.RegisterMsgResp(nodeManager, worker, DeviceInfoResp, nil)
log.Info("------------------------Send deviceInfo message ended------------------------")
params := utils.BuildParams(m.DockerOp.ReportTaskIds)
msgRespWorker.RegisterMsgResp(nodeManager, worker, SubmitResourceMapRes, params)
if len(m.DockerOp.ReportTaskIds) == 0 {
params := utils.BuildParams(m.DockerOp.ReportTaskIds)
msgRespWorker.RegisterMsgResp(nodeManager, worker, SubmitResourceMapRes, params)
}
log.Info("------------------------Send once-off message ended------------------------")
nodeManagerHandler := NewNodeManagerHandler(nodeManager, worker, msgRespWorker, taskMsgWorker)
log.Info("Report model info started")
go nodeManagerHandler.MonitorStandardTaskWorker()
log.Info("Monitor standard task worker started")
proofWorker := validator.NewProofWorker()
// 证明存储
//go proofWorker.ProofStorage()
//log.Info("Proof storage worker started")
......@@ -84,12 +89,9 @@ func (m *MonitorNm) monitorNmClient() {
//go proofWorker.CommitWitness()
//log.Info("Proof commit worker started")
go nodeManagerHandler.handlerStandardTask()
log.Info("Handler standard task worker started")
// 处理消息
for i := 0; i < 2; i++ {
go nodeManagerHandler.handlerMsg(m.NodeManagerMsgChan, proofWorker)
go nodeManagerHandler.DistributionMsgWorker(m.NodeManagerMsgChan, proofWorker)
}
log.Info("------------------------Start rev msg worker thread------------------------")
......
......@@ -27,7 +27,7 @@ func NewNodeManagerHandler(nodeManager *models.NodeManagerClient, worker nodeMan
}
}
func (n *NodeManagerHandler) handlerMsg(nodeManagerMsgChan chan *nodeManagerV1.ManagerMessage, proofWorker *validator.ProofWorker) {
func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *nodeManagerV1.ManagerMessage, proofWorker *validator.ProofWorker) {
for {
select {
case rev := <-nodeManagerMsgChan:
......@@ -159,7 +159,7 @@ func (n *NodeManagerHandler) handlerMsg(nodeManagerMsgChan chan *nodeManagerV1.M
}
}
func (n *NodeManagerHandler) handlerStandardTask() {
func (n *NodeManagerHandler) MonitorStandardTaskWorker() {
//ticker := time.NewTicker(time.Second * 30)
ticker := time.NewTicker(time.Minute * 5)
for {
......
......@@ -40,7 +40,7 @@ func (o *RespMsgWorker) RegisterMsgResp(nodeManager *models.NodeManagerClient, w
log.Info("----------------add msg response-------------")
}
func (o *RespMsgWorker) SendMsg() {
func (o *RespMsgWorker) SendMsgWorker() {
for {
select {
case pool := <-o.MsgPool:
......
......@@ -53,7 +53,7 @@ func NewTaskWorker(op *operate.DockerOp) *TaskHandler {
}
}
func (t *TaskHandler) HandlerTask(runCount int) {
func (t *TaskHandler) DistributionTaskWorker(runCount int) {
for i := 0; i < runCount; i++ {
go func(t *TaskHandler) {
for {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment