Commit e6786a62 authored by duanjinfei's avatar duanjinfei

update submit taskid indexes resoursemap

parent dc4a4687
...@@ -49,6 +49,8 @@ func monitorModelInfo(dockerOp *operate.DockerOp) { ...@@ -49,6 +49,8 @@ func monitorModelInfo(dockerOp *operate.DockerOp) {
log.Error("Docker op ps images failed:", err) log.Error("Docker op ps images failed:", err)
continue continue
} }
reportTaskIds := make([]uint64, 0)
reportTaskIds = append(reportTaskIds, uint64(len(modelInfosResp)))
for _, modelInfo := range modelInfosResp { for _, modelInfo := range modelInfosResp {
if modelInfo.ImageName == "" { if modelInfo.ImageName == "" {
continue continue
...@@ -64,16 +66,16 @@ func monitorModelInfo(dockerOp *operate.DockerOp) { ...@@ -64,16 +66,16 @@ func monitorModelInfo(dockerOp *operate.DockerOp) {
if isPull { if isPull {
go dockerOp.PullImage(modelInfo) go dockerOp.PullImage(modelInfo)
modelInfo.IsImageExist = true modelInfo.IsImageExist = true
dockerOp.ModelTaskIdChan <- modelInfo.TaskId // todo: 是否立马上报数据
dockerOp.IsReportModelTaskId[modelInfo.TaskId] = true // reportTaskIds = append(reportTaskIds, modelInfo.TaskId)
} }
} else if !dockerOp.IsReportModelTaskId[modelInfo.TaskId] { } else {
dockerOp.ModelTaskIdChan <- modelInfo.TaskId reportTaskIds = append(reportTaskIds, modelInfo.TaskId)
dockerOp.IsReportModelTaskId[modelInfo.TaskId] = true
} }
dockerOp.SignApi[modelInfo.ImageName] = modelInfo.SignUrl dockerOp.SignApi[modelInfo.ImageName] = modelInfo.SignUrl
dockerOp.ModelsInfo = append(dockerOp.ModelsInfo, modelInfo) dockerOp.ModelsInfo = append(dockerOp.ModelsInfo, modelInfo)
} }
dockerOp.ModelTaskIdIndexesChan <- reportTaskIds
ticker = time.NewTicker(time.Minute * 10) ticker = time.NewTicker(time.Minute * 10)
} }
} }
...@@ -84,7 +86,7 @@ func reportModelInfo(nodeManager *models.NodeManagerClient, ...@@ -84,7 +86,7 @@ func reportModelInfo(nodeManager *models.NodeManagerClient,
msgRespWorker *RespMsgWorker, dockerOp *operate.DockerOp) { msgRespWorker *RespMsgWorker, dockerOp *operate.DockerOp) {
for { for {
select { select {
case taskId := <-dockerOp.ModelTaskIdChan: case taskId := <-dockerOp.ModelTaskIdIndexesChan:
if !nodeManager.Status { if !nodeManager.Status {
log.WithField("endpoint", nodeManager.Endpoint).Error("Node manager is down , stop report model info") log.WithField("endpoint", nodeManager.Endpoint).Error("Node manager is down , stop report model info")
return return
......
...@@ -44,6 +44,10 @@ func (o *RespMsgWorker) SendMsg() { ...@@ -44,6 +44,10 @@ func (o *RespMsgWorker) SendMsg() {
case pool := <-o.MsgPool: case pool := <-o.MsgPool:
{ {
workerMsg := pool.handler(pool.params...) workerMsg := pool.handler(pool.params...)
if workerMsg == nil {
log.Warn("Send to node manager workerMsg is nil")
continue
}
err := pool.workerClient.SendMsg(workerMsg) err := pool.workerClient.SendMsg(workerMsg)
if err != nil { if err != nil {
log.Error("Send msg to nm client failed:", err) log.Error("Send msg to nm client failed:", err)
...@@ -71,10 +75,19 @@ func HeartbeatResp(params ...interface{}) *nodemanagerV1.WorkerMessage { ...@@ -71,10 +75,19 @@ func HeartbeatResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
func SubmitResourceMapRes(params ...interface{}) *nodemanagerV1.WorkerMessage { func SubmitResourceMapRes(params ...interface{}) *nodemanagerV1.WorkerMessage {
log.Info("Heartbeat response received params: ", params) log.Info("Heartbeat response received params: ", params)
taskIdIndex := params[0].(uint64) taskIdLength := params[0].(uint64)
b := bitmap.New(taskIdIndex) b := bitmap.New(taskIdLength)
for i := 1; i < len(params); i++ {
taskIdIndex := params[i].(uint64)
err := b.Set(taskIdIndex)
if err != nil {
log.WithField("taskId index", i).Error("Error setting task id index")
return nil
}
}
binary, err := b.MarshalBinary() binary, err := b.MarshalBinary()
if err != nil { if err != nil {
log.Error("bitmap marshal binary failed with error: ", err)
return nil return nil
} }
heartRes := &nodemanagerV1.WorkerMessage{ heartRes := &nodemanagerV1.WorkerMessage{
......
...@@ -23,14 +23,13 @@ import ( ...@@ -23,14 +23,13 @@ import (
var httpClient *http.Client var httpClient *http.Client
type DockerOp struct { type DockerOp struct {
IsHealthy bool IsHealthy bool
Reason string Reason string
dockerClient *client.Client dockerClient *client.Client
UsedExternalPort map[int64]bool UsedExternalPort map[int64]bool
SignApi map[string]string SignApi map[string]string
ModelsInfo []*models.ModelInfo ModelsInfo []*models.ModelInfo
IsReportModelTaskId map[uint64]bool ModelTaskIdIndexesChan chan []uint64
ModelTaskIdChan chan uint64
} }
func init() { func init() {
...@@ -46,14 +45,13 @@ func NewDockerOp() *DockerOp { ...@@ -46,14 +45,13 @@ func NewDockerOp() *DockerOp {
} }
} }
return &DockerOp{ return &DockerOp{
IsHealthy: true, IsHealthy: true,
Reason: "", Reason: "",
dockerClient: dockerClient, dockerClient: dockerClient,
SignApi: make(map[string]string, 0), SignApi: make(map[string]string, 0),
ModelsInfo: make([]*models.ModelInfo, 0), ModelsInfo: make([]*models.ModelInfo, 0),
IsReportModelTaskId: make(map[uint64]bool, 0), UsedExternalPort: make(map[int64]bool, 0),
UsedExternalPort: make(map[int64]bool, 0), ModelTaskIdIndexesChan: make(chan []uint64, 0),
ModelTaskIdChan: make(chan uint64, 0),
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment