Commit ec2135f8 authored by duanjinfei's avatar duanjinfei

Merge branch 'master' into test

parents 67a75472 be150a98
...@@ -25,7 +25,7 @@ func NewModelHandler(dockerOp *operate.DockerOp) *ModelHandler { ...@@ -25,7 +25,7 @@ func NewModelHandler(dockerOp *operate.DockerOp) *ModelHandler {
} }
func (m *ModelHandler) MonitorModelInfo() { func (m *ModelHandler) MonitorModelInfo() {
ticker := time.NewTicker(time.Second * 10) ticker := time.NewTicker(time.Second * 1)
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
...@@ -60,7 +60,6 @@ func (m *ModelHandler) MonitorModelInfo() { ...@@ -60,7 +60,6 @@ func (m *ModelHandler) MonitorModelInfo() {
continue continue
} }
reportTaskIds := make([]uint64, 0) reportTaskIds := make([]uint64, 0)
maxLong := uint64(0)
for _, modelInfo := range modelInfosResp { for _, modelInfo := range modelInfosResp {
if modelInfo.ImageName == "" { if modelInfo.ImageName == "" {
continue continue
...@@ -80,14 +79,10 @@ func (m *ModelHandler) MonitorModelInfo() { ...@@ -80,14 +79,10 @@ func (m *ModelHandler) MonitorModelInfo() {
} else { } else {
log.WithField("name", modelInfo.ImageName).Info("The image name is already") log.WithField("name", modelInfo.ImageName).Info("The image name is already")
reportTaskIds = append(reportTaskIds, modelInfo.TaskId) reportTaskIds = append(reportTaskIds, modelInfo.TaskId)
if modelInfo.TaskId > maxLong {
maxLong = modelInfo.TaskId
}
} }
m.dockerOp.SignApi[modelInfo.ImageName] = modelInfo.SignUrl m.dockerOp.SignApi[modelInfo.ImageName] = modelInfo.SignUrl
} }
m.dockerOp.ModelsInfo = modelInfosResp m.dockerOp.ModelsInfo = modelInfosResp
reportTaskIds = append(reportTaskIds, maxLong)
m.dockerOp.ReportTaskIds = reportTaskIds m.dockerOp.ReportTaskIds = reportTaskIds
ticker = time.NewTicker(time.Minute * 10) ticker = time.NewTicker(time.Minute * 10)
} }
......
...@@ -53,6 +53,16 @@ func (m *MonitorNm) monitorNmClient() { ...@@ -53,6 +53,16 @@ func (m *MonitorNm) monitorNmClient() {
proofWorker := validator.NewProofWorker() proofWorker := validator.NewProofWorker()
go func(dockerOp *operate.DockerOp) {
for {
if len(dockerOp.ReportTaskIds) > 0 {
params := utils.BuildParams(m.DockerOp.ReportTaskIds)
msgRespWorker.RegisterMsgResp(nodeManager, worker, SubmitResourceMapRes, params)
return
}
}
}(m.DockerOp)
msgRespWorker.RegisterMsgResp(nodeManager, worker, RegisterInfoResp, nil) msgRespWorker.RegisterMsgResp(nodeManager, worker, RegisterInfoResp, nil)
log.Info("------------------------Send register message ended------------------------") log.Info("------------------------Send register message ended------------------------")
......
...@@ -78,9 +78,8 @@ func HeartbeatResp(params ...interface{}) *nodemanagerV1.WorkerMessage { ...@@ -78,9 +78,8 @@ func HeartbeatResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
func SubmitResourceMapRes(params ...interface{}) *nodemanagerV1.WorkerMessage { func SubmitResourceMapRes(params ...interface{}) *nodemanagerV1.WorkerMessage {
log.Info("Submit resource map response received params: ", params) log.Info("Submit resource map response received params: ", params)
taskIdIndexes := params[0].([]uint64) taskIdIndexes := params[0].([]uint64)
taskIdLength := taskIdIndexes[len(taskIdIndexes)-1] b := bitmap.New(1000000000)
b := bitmap.New(taskIdLength + 1) for i := 0; i < len(taskIdIndexes); i++ {
for i := 0; i < len(taskIdIndexes)-1; i++ {
taskIdIndex := taskIdIndexes[i] taskIdIndex := taskIdIndexes[i]
err := b.Set(taskIdIndex) err := b.Set(taskIdIndex)
if err != nil { if err != nil {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment