Commit 8ca243e7 authored by duanjinfei's avatar duanjinfei

update nm protocol

parent ec926cab
...@@ -54,6 +54,7 @@ type NodeManagerClient struct { ...@@ -54,6 +54,7 @@ type NodeManagerClient struct {
Client nodeManagerV1.NodeManagerServiceClient Client nodeManagerV1.NodeManagerServiceClient
Status bool Status bool
IsDel bool IsDel bool
IsSelected bool
} }
func (n *NodeManagerClient) GetLastHeartTime() int64 { func (n *NodeManagerClient) GetLastHeartTime() int64 {
......
...@@ -35,7 +35,7 @@ func (o *RespMsgWorker) RegisterMsgResp(nodeManager *models.NodeManagerClient, w ...@@ -35,7 +35,7 @@ func (o *RespMsgWorker) RegisterMsgResp(nodeManager *models.NodeManagerClient, w
handler: handler, handler: handler,
params: params, params: params,
} }
log.Info("----------------register msg -------------") log.Info("----------------add msg response-------------")
} }
func (o *RespMsgWorker) SendMsg() { func (o *RespMsgWorker) SendMsg() {
...@@ -103,6 +103,35 @@ func SubmitResourceMapRes(params ...interface{}) *nodemanagerV1.WorkerMessage { ...@@ -103,6 +103,35 @@ func SubmitResourceMapRes(params ...interface{}) *nodemanagerV1.WorkerMessage {
return heartRes return heartRes
} }
func RegisterInfoResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
log.Info("Register info response received params:", params)
nodeInfoRes := &nodemanagerV1.WorkerMessage{
Message: &nodemanagerV1.WorkerMessage_RegisteMessage{
RegisteMessage: &nodemanagerV1.RegisteMessage{
DeviceIp: conf.GetConfig().GetExternalIp(),
MinerPubkey: conf.GetConfig().SignPub,
},
},
}
log.Info("---------------------------------------Send register info msg ------------------------------------")
return nodeInfoRes
}
func NodeInfoResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
log.Info("Node info response received params:", params)
nodeInfoRes := &nodemanagerV1.WorkerMessage{
Message: &nodemanagerV1.WorkerMessage_NodeInfo{
NodeInfo: &nodemanagerV1.NodeInfoResponse{
MinerPubkey: conf.GetConfig().SignPub,
BenefitAddress: conf.GetConfig().BenefitAddress,
DeviceIp: conf.GetConfig().GetExternalIp(),
},
},
}
log.Info("---------------------------------------Send node info msg ------------------------------------")
return nodeInfoRes
}
func DeviceInfoResp(params ...interface{}) *nodemanagerV1.WorkerMessage { func DeviceInfoResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
log.Info("Device info response received params:", params) log.Info("Device info response received params:", params)
devices := make([]*nodemanagerV1.DeviceInfo, 0) devices := make([]*nodemanagerV1.DeviceInfo, 0)
...@@ -146,12 +175,9 @@ func DeviceInfoResp(params ...interface{}) *nodemanagerV1.WorkerMessage { ...@@ -146,12 +175,9 @@ func DeviceInfoResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
deviceInfoRes := &nodemanagerV1.WorkerMessage{ deviceInfoRes := &nodemanagerV1.WorkerMessage{
Message: &nodemanagerV1.WorkerMessage_DeviceInfo{ Message: &nodemanagerV1.WorkerMessage_DeviceInfo{
DeviceInfo: &nodemanagerV1.DeviceInfoResponse{ DeviceInfo: &nodemanagerV1.DeviceInfoMessage{
MinerPubkey: conf.GetConfig().SignPub,
BenefitAddress: conf.GetConfig().BenefitAddress,
Devices: devices, Devices: devices,
DeviceSignature: []byte(""), DeviceSignature: []byte(""),
DeviceIps: []string{conf.GetConfig().GetExternalIp()},
}, },
}, },
} }
......
...@@ -62,12 +62,13 @@ func getNodeManager(endPoint string) *NodeManager { ...@@ -62,12 +62,13 @@ func getNodeManager(endPoint string) *NodeManager {
return nil return nil
} }
func inputNodeManagerChan(manager *NodeManager, nodeManagerClient *models.NodeManagerClient) bool { func inputNodeManagerChan(manager *NodeManager, nodeManagerClient *models.NodeManagerClient, isSelect bool) bool {
if nodeManagerClient == nil { if nodeManagerClient == nil {
nodeManagerClient = &models.NodeManagerClient{ nodeManagerClient = &models.NodeManagerClient{
PublicKey: manager.Info.Publickey, PublicKey: manager.Info.Publickey,
Endpoint: manager.Info.Endpoint, Endpoint: manager.Info.Endpoint,
Status: true, Status: true,
IsSelected: isSelect,
LastHeartTime: time.Now().UnixMilli(), LastHeartTime: time.Now().UnixMilli(),
} }
} }
......
...@@ -47,7 +47,9 @@ func StartMonitor() { ...@@ -47,7 +47,9 @@ func StartMonitor() {
for isInit { for isInit {
} }
connectNodeManagerCount := 0 var connectNodeManagerCount int64 = 0
selectClientRandomNum := utils.GenerateRandomNumber(conf.GetConfig().SignPrivateKey, conf.GetConfig().NodeManagerNum)
isSelect := false
for i := 0; i < len(nodeManagerArr); i++ { for i := 0; i < len(nodeManagerArr); i++ {
// TODO: 需要对索引进行一定的规则判断,随机选择其中的nodeManager进行链接 // TODO: 需要对索引进行一定的规则判断,随机选择其中的nodeManager进行链接
if int64(connectNodeManagerCount) == conf.GetConfig().NodeManagerNum { if int64(connectNodeManagerCount) == conf.GetConfig().NodeManagerNum {
...@@ -61,7 +63,12 @@ func StartMonitor() { ...@@ -61,7 +63,12 @@ func StartMonitor() {
continue continue
} }
if !manager.IsUsed { if !manager.IsUsed {
isSuccess := inputNodeManagerChan(manager, nil) if selectClientRandomNum.Int64() == connectNodeManagerCount {
isSelect = true
} else {
isSelect = false
}
isSuccess := inputNodeManagerChan(manager, nil, isSelect)
if !isSuccess { if !isSuccess {
log.Warn("Init input node manager chan failed") log.Warn("Init input node manager chan failed")
continue continue
...@@ -85,7 +92,7 @@ func StartMonitor() { ...@@ -85,7 +92,7 @@ func StartMonitor() {
} }
if !managerClient.IsDel { if !managerClient.IsDel {
// TODO: 重试连接三次 // TODO: 重试连接三次
isSuccess := inputNodeManagerChan(manager, managerClient) isSuccess := inputNodeManagerChan(manager, managerClient, managerClient.IsSelected)
log.Warn("Try to connect node manager client:", manager.Info.Endpoint) log.Warn("Try to connect node manager client:", manager.Info.Endpoint)
if isSuccess { if isSuccess {
log.Info("Connect node manager client success:", manager.Info.Endpoint) log.Info("Connect node manager client success:", manager.Info.Endpoint)
...@@ -98,10 +105,18 @@ func StartMonitor() { ...@@ -98,10 +105,18 @@ func StartMonitor() {
log.Warn("There is no node manager available at this time") log.Warn("There is no node manager available at this time")
break break
} }
if managerClient.IsSelected {
for _, client := range usedNodeManagerClient {
if client.Status && !client.IsDel {
client.IsSelected = true
break
}
}
}
for i := 0; i < len(unUsedNodeManagers); i++ { for i := 0; i < len(unUsedNodeManagers); i++ {
randomNum := utils.GenerateRandomNumber(conf.GetConfig().SignPrivateKey, int64(len(nodeManagerArr))) randomNum := utils.GenerateRandomNumber(conf.GetConfig().SignPrivateKey, int64(len(nodeManagerArr)))
unUsedManager := unUsedNodeManagers[randomNum.Int64()] unUsedManager := unUsedNodeManagers[randomNum.Int64()]
isSuccess := inputNodeManagerChan(unUsedManager, nil) isSuccess := inputNodeManagerChan(unUsedManager, nil, false)
if !isSuccess { if !isSuccess {
log.Warn("Connect unused node manager client error:", manager.Info.Endpoint) log.Warn("Connect unused node manager client error:", manager.Info.Endpoint)
break break
...@@ -137,6 +152,25 @@ func monitorWorker(op *operate.DockerOp) { ...@@ -137,6 +152,25 @@ func monitorWorker(op *operate.DockerOp) {
proofWorker := validator.NewProofWorker() proofWorker := validator.NewProofWorker()
// 主动上报发送设备信息
go func(isSelect bool) {
ticker := time.NewTicker(time.Millisecond)
isSend := false
for {
select {
case <-ticker.C:
if isSend {
return
}
ticker = time.NewTicker(time.Second * 20)
msgRespWorker.RegisterMsgResp(nodeManager, worker, RegisterInfoResp, nil)
msgRespWorker.RegisterMsgResp(nodeManager, worker, DeviceInfoResp, nil)
isSend = true
log.Info("------------------------Send once-off message ended------------------------")
}
}
}(nodeManager.IsSelected)
// 上报image信息 // 上报image信息
go reportModelInfo(nodeManager, worker, msgRespWorker, op) go reportModelInfo(nodeManager, worker, msgRespWorker, op)
log.Info("Report model info started") log.Info("Report model info started")
...@@ -295,13 +329,6 @@ func handlerMsg(nodeManager *models.NodeManagerClient, ...@@ -295,13 +329,6 @@ func handlerMsg(nodeManager *models.NodeManagerClient,
continue continue
} }
deviceMsg := rev.GetDeviceRequest()
if deviceMsg != nil {
msgRespWorker.RegisterMsgResp(nodeManager, worker, DeviceInfoResp, nil)
log.Info(deviceMsg)
continue
}
deviceUsageMsg := rev.GetDeviceUsage() deviceUsageMsg := rev.GetDeviceUsage()
if deviceUsageMsg != nil { if deviceUsageMsg != nil {
msgRespWorker.RegisterMsgResp(nodeManager, worker, DeviceUsageResp, nil) msgRespWorker.RegisterMsgResp(nodeManager, worker, DeviceUsageResp, nil)
...@@ -309,6 +336,13 @@ func handlerMsg(nodeManager *models.NodeManagerClient, ...@@ -309,6 +336,13 @@ func handlerMsg(nodeManager *models.NodeManagerClient,
continue continue
} }
nodeInfoMsg := rev.GetNodeInfoRequest()
if nodeInfoMsg != nil {
msgRespWorker.RegisterMsgResp(nodeManager, worker, NodeInfoResp, nil)
log.Info(nodeInfoMsg)
continue
}
statusReqMsg := rev.GetStatusRequest() statusReqMsg := rev.GetStatusRequest()
if statusReqMsg != nil { if statusReqMsg != nil {
msgRespWorker.RegisterMsgResp(nodeManager, worker, StatusResp, nil) msgRespWorker.RegisterMsgResp(nodeManager, worker, StatusResp, nil)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment