Commit 970c86eb authored by duanjinfei's avatar duanjinfei

update monitor node manager seed

parent d1925c9f
...@@ -23,6 +23,7 @@ type Config struct { ...@@ -23,6 +23,7 @@ type Config struct {
TaskValidatorTime float64 `json:"task_validator_time"` TaskValidatorTime float64 `json:"task_validator_time"`
BenefitAddress string `json:"benefit_address"` BenefitAddress string `json:"benefit_address"`
ContainerNum int64 `json:"container_num"` ContainerNum int64 `json:"container_num"`
NodeManagerNum int64 `json:"node_manager_num"`
} }
var _cfg *Config = nil var _cfg *Config = nil
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
"sign_prv": "0e80b06d24d7543b3e2520c91d25997bcf5e0e9e6361910cea6ab268c2db3600", "sign_prv": "0e80b06d24d7543b3e2520c91d25997bcf5e0e9e6361910cea6ab268c2db3600",
"docker_server": "tcp://192.168.1.120:2375", "docker_server": "tcp://192.168.1.120:2375",
"benefit_address": "0x84A3175be614F5886f99Da506dF08682DF530739", "benefit_address": "0x84A3175be614F5886f99Da506dF08682DF530739",
"node_manager_num": 1,
"heart_response": 30, "heart_response": 30,
"task_validator_time": 1, "task_validator_time": 1,
"container_num": 1 "container_num": 1
......
package controllers package controllers
import "example.com/m/nm" import (
"example.com/m/nm"
nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
)
type NodeController struct { type NodeController struct {
BaseController BaseController
...@@ -10,7 +13,15 @@ func (c *NodeController) SetNmSeed() { ...@@ -10,7 +13,15 @@ func (c *NodeController) SetNmSeed() {
c.ResponseInfo(200, "sign successful", "") c.ResponseInfo(200, "sign successful", "")
} }
func (c *NodeController) AddNodeManager() {
nodeManager := &nodeManagerV1.NodeManagerInfo{
Publickey: "public",
Endpoint: "http://localhost",
}
nm.AddNodeManager(nodeManager)
}
func (c *NodeController) GetNodeManager() { func (c *NodeController) GetNodeManager() {
manager := nm.GetNodeManager() manager := nm.GetNodeManagers()
c.ResponseInfo(200, "sign successful", manager) c.ResponseInfo(200, "sign successful", manager)
} }
...@@ -14,41 +14,27 @@ import ( ...@@ -14,41 +14,27 @@ import (
"time" "time"
) )
type NodeManager struct {
Info *nodeManagerV1.NodeManagerInfo
IsUsed bool
IsExist bool
}
// 指定远程 Docker 服务的地址 // 指定远程 Docker 服务的地址
var ( var (
nodeManagers []*nodeManagerV1.NodeManagerInfo isInit = true
usedNodeManager []*models.NodeManagerClient nodeManagerArr []*NodeManager
nodeManagerIsUsed map[string]bool usedNodeManagerClient []*models.NodeManagerClient
nodeManagerIsDel map[string]bool nodeManagerChan chan *models.NodeManagerClient
nodeManagerChan chan *models.NodeManagerClient nodeManagerMsgChan chan *nodeManagerV1.ManagerMessage
nodeManagerMsgChan chan *nodeManagerV1.ManagerMessage
) )
func init() { func init() {
usedNodeManager = make([]*models.NodeManagerClient, 0) nodeManagerArr = make([]*NodeManager, 0)
nodeManagerIsUsed = make(map[string]bool, 0) usedNodeManagerClient = make([]*models.NodeManagerClient, 0)
nodeManagerIsDel = make(map[string]bool, 0)
nodeManagerChan = make(chan *models.NodeManagerClient, 0) nodeManagerChan = make(chan *models.NodeManagerClient, 0)
nodeManagerMsgChan = make(chan *nodeManagerV1.ManagerMessage, 0) nodeManagerMsgChan = make(chan *nodeManagerV1.ManagerMessage, 0)
seed := conf.GetConfig().NmSeed
log.Info("Nm seed url:", seed)
seedServiceClient := operate.ConnNmGrpc(seed)
if seedServiceClient == nil {
panic("Dial nm seed service client failed")
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
list, err := seedServiceClient.ManagerList(ctx, &nodeManagerV1.ManagerListRequest{}, grpc.EmptyCallOption{})
if err != nil {
panic(fmt.Sprintf("Get manager list failed : %s", err.Error()))
}
if list.GetManagers() == nil || len(list.GetManagers()) == 0 {
panic("Get manager list failed,the manager list is nil")
}
for _, node := range list.GetManagers() {
nodeManagers = append(nodeManagers, node)
nodeManagerIsDel[node.Publickey] = false
}
} }
func StartMonitor() { func StartMonitor() {
...@@ -58,28 +44,57 @@ func StartMonitor() { ...@@ -58,28 +44,57 @@ func StartMonitor() {
panic("Docker client is not healthy") panic("Docker client is not healthy")
} }
go monitorNodeManagerSeed()
go monitorWorker(dockerOp) go monitorWorker(dockerOp)
go monitorModelInfo(dockerOp) go monitorModelInfo(dockerOp)
for _, manager := range nodeManagers { for isInit {
}
connectNodeManagerCount := 0
for _, manager := range nodeManagerArr {
if !manager.IsExist {
continue
}
// TODO: 需要对索引进行一定的规则判断,随机选择其中的nodeManager进行链接 // TODO: 需要对索引进行一定的规则判断,随机选择其中的nodeManager进行链接
if int64(connectNodeManagerCount) == conf.GetConfig().NodeManagerNum {
break
}
isSuccess := inputNodeManagerChan(manager) isSuccess := inputNodeManagerChan(manager)
if !isSuccess { if !isSuccess {
panic("Init input node manager chan failed") panic("Init input node manager chan failed")
} }
connectNodeManagerCount++
} }
log.Info("Monitoring node manager client thread start......") log.Info("Monitoring node manager client thread start......")
ticker := time.NewTicker(time.Millisecond * 5)
for { for {
for i, managerClient := range usedNodeManager { select {
if !managerClient.GetStatus() && !nodeManagerIsDel[managerClient.PublicKey] { case <-ticker.C:
// TODO: 需要对索引进行一定的规则判断,随机选择其中的nodeManager进行链接 for _, managerClient := range usedNodeManagerClient {
nodeManagerInfo := nodeManagers[i] if !managerClient.GetStatus() {
if !nodeManagerIsUsed[nodeManagerInfo.Publickey] && !nodeManagerIsDel[managerClient.PublicKey] { manager := getNodeManager(managerClient.Endpoint)
isSuccess := inputNodeManagerChan(nodeManagerInfo) if manager == nil {
if isSuccess { log.Warn("The managerClient is not exist:", managerClient.Endpoint)
nodeManagerIsDel[managerClient.PublicKey] = true continue
break }
// TODO: 重试连接三次
isSuccess := inputNodeManagerChan(manager)
if !isSuccess {
unUsedNodeManager := getUnUsedNodeManager()
if unUsedNodeManager == nil || len(unUsedNodeManager) == 0 {
break
}
for _, nodeManager := range unUsedNodeManager {
isSuccess := inputNodeManagerChan(nodeManager)
if !isSuccess {
break
}
}
} }
} }
} }
...@@ -87,21 +102,67 @@ func StartMonitor() { ...@@ -87,21 +102,67 @@ func StartMonitor() {
} }
} }
func GetNodeManager() map[*nodeManagerV1.NodeManagerInfo]bool { func getUnUsedNodeManager() []*NodeManager {
res := make(map[*nodeManagerV1.NodeManagerInfo]bool, 0) res := make([]*NodeManager, 0)
for _, manager := range nodeManagers { for _, manager := range nodeManagerArr {
res[manager] = nodeManagerIsUsed[manager.Publickey] if !manager.IsUsed && manager.IsExist {
res = append(res, manager)
}
} }
return res return res
} }
func monitorNodeManagerSeed() {
ticker := time.NewTicker(time.Second * 1)
for {
select {
case <-ticker.C:
seed := conf.GetConfig().NmSeed
log.Info("Nm seed url:", seed)
seedServiceClient := operate.ConnNmGrpc(seed)
if seedServiceClient == nil {
panic("Dial nm seed service client failed")
}
list, err := seedServiceClient.ManagerList(context.Background(), &nodeManagerV1.ManagerListRequest{}, grpc.EmptyCallOption{})
if err != nil {
panic("Nm seed seed service is dealing")
}
if list.GetManagers() == nil || len(list.GetManagers()) == 0 {
continue
}
for _, node := range list.GetManagers() {
nodeManager := &NodeManager{
Info: node,
IsUsed: false,
IsExist: true,
}
nodeManagerArr = append(nodeManagerArr, nodeManager)
}
isInit = false
ticker = time.NewTicker(time.Minute * 10)
}
}
}
func GetNodeManagers() []*NodeManager {
return nodeManagerArr
}
func AddNodeManager(node *nodeManagerV1.NodeManagerInfo) { func AddNodeManager(node *nodeManagerV1.NodeManagerInfo) {
nodeManagers = append(nodeManagers, node) nodeManager := &NodeManager{
nodeManagerIsDel[node.Publickey] = false Info: node,
IsUsed: false,
IsExist: true,
}
nodeManagerArr = append(nodeManagerArr, nodeManager)
} }
func DelNodeManager(node *nodeManagerV1.NodeManagerInfo) { func DelNodeManager(node *nodeManagerV1.NodeManagerInfo) {
nodeManagerIsDel[node.Publickey] = true for _, manager := range nodeManagerArr {
if manager.Info.Endpoint == node.Endpoint {
manager.IsExist = false
}
}
} }
// monitorWorker 监听worker // monitorWorker 监听worker
...@@ -327,20 +388,29 @@ func buildParams(params ...interface{}) []interface{} { ...@@ -327,20 +388,29 @@ func buildParams(params ...interface{}) []interface{} {
return res return res
} }
func inputNodeManagerChan(manager *nodeManagerV1.NodeManagerInfo) bool { func inputNodeManagerChan(manager *NodeManager) bool {
n := &models.NodeManagerClient{ n := &models.NodeManagerClient{
PublicKey: manager.Publickey, PublicKey: manager.Info.Publickey,
Endpoint: manager.Endpoint, Endpoint: manager.Info.Endpoint,
Status: true, Status: true,
LastHeartTime: time.Now().UnixMilli(), LastHeartTime: time.Now().UnixMilli(),
} }
serviceClient := operate.ConnNmGrpc(manager.Endpoint) serviceClient := operate.ConnNmGrpc(manager.Info.Endpoint)
if serviceClient == nil { if serviceClient == nil {
return false return false
} }
n.Client = serviceClient n.Client = serviceClient
nodeManagerChan <- n nodeManagerChan <- n
usedNodeManager = append(usedNodeManager, n) usedNodeManagerClient = append(usedNodeManagerClient, n)
nodeManagerIsUsed[manager.Publickey] = true manager.IsUsed = true
return true return true
} }
func getNodeManager(endPoint string) *NodeManager {
for _, manager := range nodeManagerArr {
if manager.Info.Endpoint == endPoint {
return manager
}
}
return nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment