Commit 0ef99653 authored by duanjinfei's avatar duanjinfei

Merge branch 'master' into test

parents ba4b8d09 195d6923
...@@ -44,14 +44,12 @@ func (m *MonitorNm) monitorNmClient() { ...@@ -44,14 +44,12 @@ func (m *MonitorNm) monitorNmClient() {
} }
msgRespWorker := NewMsgRespWorker() msgRespWorker := NewMsgRespWorker()
for i := 0; i < 2; i++ { go msgRespWorker.SendMsgWorker()
go msgRespWorker.SendMsg() log.Info("Send msg worker started.......................")
}
taskMsgWorker := NewTaskWorker(m.DockerOp) taskMsgWorker := NewTaskWorker(m.DockerOp)
taskMsgWorker.HandlerTask(4) taskMsgWorker.DistributionTaskWorker(4)
log.Info("Distribution task worker started.......................")
proofWorker := validator.NewProofWorker()
go func(dockerOp *operate.DockerOp) { go func(dockerOp *operate.DockerOp) {
for { for {
...@@ -69,13 +67,20 @@ func (m *MonitorNm) monitorNmClient() { ...@@ -69,13 +67,20 @@ func (m *MonitorNm) monitorNmClient() {
msgRespWorker.RegisterMsgResp(nodeManager, worker, DeviceInfoResp, nil) msgRespWorker.RegisterMsgResp(nodeManager, worker, DeviceInfoResp, nil)
log.Info("------------------------Send deviceInfo message ended------------------------") log.Info("------------------------Send deviceInfo message ended------------------------")
if len(m.DockerOp.ReportTaskIds) == 0 {
params := utils.BuildParams(m.DockerOp.ReportTaskIds) params := utils.BuildParams(m.DockerOp.ReportTaskIds)
msgRespWorker.RegisterMsgResp(nodeManager, worker, SubmitResourceMapRes, params) msgRespWorker.RegisterMsgResp(nodeManager, worker, SubmitResourceMapRes, params)
}
log.Info("------------------------Send once-off message ended------------------------") log.Info("------------------------Send once-off message ended------------------------")
nodeManagerHandler := NewNodeManagerHandler(nodeManager, worker, msgRespWorker, taskMsgWorker) nodeManagerHandler := NewNodeManagerHandler(nodeManager, worker, msgRespWorker, taskMsgWorker)
log.Info("Report model info started") log.Info("Report model info started")
go nodeManagerHandler.MonitorStandardTaskWorker()
log.Info("Monitor standard task worker started")
proofWorker := validator.NewProofWorker()
// 证明存储 // 证明存储
//go proofWorker.ProofStorage() //go proofWorker.ProofStorage()
//log.Info("Proof storage worker started") //log.Info("Proof storage worker started")
...@@ -84,12 +89,9 @@ func (m *MonitorNm) monitorNmClient() { ...@@ -84,12 +89,9 @@ func (m *MonitorNm) monitorNmClient() {
//go proofWorker.CommitWitness() //go proofWorker.CommitWitness()
//log.Info("Proof commit worker started") //log.Info("Proof commit worker started")
go nodeManagerHandler.handlerStandardTask()
log.Info("Handler standard task worker started")
// 处理消息 // 处理消息
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
go nodeManagerHandler.handlerMsg(m.NodeManagerMsgChan, proofWorker) go nodeManagerHandler.DistributionMsgWorker(m.NodeManagerMsgChan, proofWorker)
} }
log.Info("------------------------Start rev msg worker thread------------------------") log.Info("------------------------Start rev msg worker thread------------------------")
......
...@@ -27,7 +27,7 @@ func NewNodeManagerHandler(nodeManager *models.NodeManagerClient, worker nodeMan ...@@ -27,7 +27,7 @@ func NewNodeManagerHandler(nodeManager *models.NodeManagerClient, worker nodeMan
} }
} }
func (n *NodeManagerHandler) handlerMsg(nodeManagerMsgChan chan *nodeManagerV1.ManagerMessage, proofWorker *validator.ProofWorker) { func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *nodeManagerV1.ManagerMessage, proofWorker *validator.ProofWorker) {
for { for {
select { select {
case rev := <-nodeManagerMsgChan: case rev := <-nodeManagerMsgChan:
...@@ -159,7 +159,7 @@ func (n *NodeManagerHandler) handlerMsg(nodeManagerMsgChan chan *nodeManagerV1.M ...@@ -159,7 +159,7 @@ func (n *NodeManagerHandler) handlerMsg(nodeManagerMsgChan chan *nodeManagerV1.M
} }
} }
func (n *NodeManagerHandler) handlerStandardTask() { func (n *NodeManagerHandler) MonitorStandardTaskWorker() {
//ticker := time.NewTicker(time.Second * 30) //ticker := time.NewTicker(time.Second * 30)
ticker := time.NewTicker(time.Minute * 5) ticker := time.NewTicker(time.Minute * 5)
for { for {
......
...@@ -40,7 +40,7 @@ func (o *RespMsgWorker) RegisterMsgResp(nodeManager *models.NodeManagerClient, w ...@@ -40,7 +40,7 @@ func (o *RespMsgWorker) RegisterMsgResp(nodeManager *models.NodeManagerClient, w
log.Info("----------------add msg response-------------") log.Info("----------------add msg response-------------")
} }
func (o *RespMsgWorker) SendMsg() { func (o *RespMsgWorker) SendMsgWorker() {
for { for {
select { select {
case pool := <-o.MsgPool: case pool := <-o.MsgPool:
......
...@@ -53,7 +53,7 @@ func NewTaskWorker(op *operate.DockerOp) *TaskHandler { ...@@ -53,7 +53,7 @@ func NewTaskWorker(op *operate.DockerOp) *TaskHandler {
} }
} }
func (t *TaskHandler) HandlerTask(runCount int) { func (t *TaskHandler) DistributionTaskWorker(runCount int) {
for i := 0; i < runCount; i++ { for i := 0; i < runCount; i++ {
go func(t *TaskHandler) { go func(t *TaskHandler) {
for { for {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment