Commit 9cc1f79b authored by duanjinfei's avatar duanjinfei

update conn nm failed count

parent ecccfb12
...@@ -26,6 +26,7 @@ type Config struct { ...@@ -26,6 +26,7 @@ type Config struct {
ApiUrl string `json:"api_url"` ApiUrl string `json:"api_url"`
ValidatorUrl string `json:"validator_url"` ValidatorUrl string `json:"validator_url"`
OssUrl string `json:"oss_url"` OssUrl string `json:"oss_url"`
AllowConnNmCount int `json:"allow_conn_nm_count"`
} }
var _cfg *Config = nil var _cfg *Config = nil
......
...@@ -7,5 +7,6 @@ ...@@ -7,5 +7,6 @@
"container_num": 1, "container_num": 1,
"chain_id": 100, "chain_id": 100,
"validator_url": "43.198.252.255:20011", "validator_url": "43.198.252.255:20011",
"oss_url": "http://43.198.252.255:13000/api/v1/upload" "oss_url": "http://43.198.252.255:13000/api/v1/upload",
"allow_conn_nm_count": 5
} }
\ No newline at end of file
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
) )
func monitorNodeManagerSeed() { func monitorNodeManagerSeed() {
connectNmCumulativeCount := 0
ticker := time.NewTicker(time.Second * 1) ticker := time.NewTicker(time.Second * 1)
for { for {
select { select {
...@@ -19,22 +20,31 @@ func monitorNodeManagerSeed() { ...@@ -19,22 +20,31 @@ func monitorNodeManagerSeed() {
log.Info("Nm seed url:", seed) log.Info("Nm seed url:", seed)
seedServiceClient := operate.ConnNmGrpc(seed) seedServiceClient := operate.ConnNmGrpc(seed)
if seedServiceClient == nil { if seedServiceClient == nil {
panic("Dial nm seed service client failed") if connectNmCumulativeCount == conf.GetConfig().AllowConnNmCount {
panic("Dial nm seed service client failed")
}
connectNmCumulativeCount++
continue
} }
list, err := seedServiceClient.ManagerList(context.Background(), &nodeManagerV1.ManagerListRequest{}, grpc.EmptyCallOption{}) list, err := seedServiceClient.ManagerList(context.Background(), &nodeManagerV1.ManagerListRequest{}, grpc.EmptyCallOption{})
if err != nil { if err != nil {
panic("Nm seed seed service is dealing") if connectNmCumulativeCount == conf.GetConfig().AllowConnNmCount {
panic("Nm seed seed service is dealing")
}
connectNmCumulativeCount++
continue
} }
if list.GetManagers() == nil || len(list.GetManagers()) == 0 { if list.GetManagers() == nil || len(list.GetManagers()) == 0 {
log.Warn("Get managers is empty through Nm seed service")
continue continue
} }
for _, node := range list.GetManagers() { for _, node := range list.GetManagers() {
if isExistNodeManager(node.Endpoint) { if isExistNodeManager(node) {
continue continue
} }
nodeManagerArr = append(nodeManagerArr, &NodeManager{Info: node, IsUsed: false, IsExist: true}) nodeManagerArr = append(nodeManagerArr, &NodeManager{Info: node, IsUsed: false, IsExist: true})
} }
isInit = false isInit = true
ticker = time.NewTicker(time.Minute * 10) ticker = time.NewTicker(time.Minute * 10)
} }
} }
......
...@@ -44,9 +44,10 @@ func getUnUsedNodeManagers() []*NodeManager { ...@@ -44,9 +44,10 @@ func getUnUsedNodeManagers() []*NodeManager {
return res return res
} }
func isExistNodeManager(endPoint string) bool { func isExistNodeManager(nodeManager *nodeManagerV1.NodeManagerInfo) bool {
for _, manager := range nodeManagerArr { for _, manager := range nodeManagerArr {
if endPoint == manager.Info.Endpoint { if nodeManager.Endpoint == manager.Info.Endpoint {
manager.Info.Publickey = nodeManager.Publickey
return true return true
} }
} }
......
...@@ -17,7 +17,7 @@ import ( ...@@ -17,7 +17,7 @@ import (
// 指定远程 Docker 服务的地址 // 指定远程 Docker 服务的地址
var ( var (
isInit = true isInit = false
nodeManagerArr []*NodeManager nodeManagerArr []*NodeManager
usedNodeManagerClient []*models.NodeManagerClient usedNodeManagerClient []*models.NodeManagerClient
nodeManagerClientChan chan *models.NodeManagerClient nodeManagerClientChan chan *models.NodeManagerClient
...@@ -44,7 +44,7 @@ func StartMonitor() { ...@@ -44,7 +44,7 @@ func StartMonitor() {
go monitorModelInfo(dockerOp) go monitorModelInfo(dockerOp)
for isInit { for !isInit {
} }
var connectNodeManagerCount int64 = 0 var connectNodeManagerCount int64 = 0
......
...@@ -112,14 +112,11 @@ func TestTaskHandler_computeTaskHandler(t1 *testing.T) { ...@@ -112,14 +112,11 @@ func TestTaskHandler_computeTaskHandler(t1 *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t1.Run(tt.name, func(t1 *testing.T) { t1.Run(tt.name, func(t1 *testing.T) {
t := &nm.TaskHandler{ t := &nm.TaskHandler{
Wg: tt.fields.wg, Wg: tt.fields.wg,
LruCache: tt.fields.lruCache, LruCache: tt.fields.lruCache,
DockerOp: tt.fields.DockerOp, DockerOp: tt.fields.DockerOp,
TaskMsg: tt.fields.TaskMsg, TaskMsg: tt.fields.TaskMsg,
TaskRespHeader: tt.fields.TaskRespHeader, HttpClient: tt.fields.HttpClient,
TaskRespBody: tt.fields.TaskRespBody,
TaskIsSuccess: tt.fields.TaskIsSuccess,
HttpClient: tt.fields.HttpClient,
} }
tt.fields.wg.Add(1) tt.fields.wg.Add(1)
t.ComputeTaskHandler(tt.args.taskMsg) t.ComputeTaskHandler(tt.args.taskMsg)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment