Commit 6eeefaf6 authored by duanjinfei's avatar duanjinfei

update monitor nodemanager client

parent 6f6d3c34
...@@ -4,10 +4,16 @@ import ( ...@@ -4,10 +4,16 @@ import (
"example.com/m/log" "example.com/m/log"
"example.com/m/nm" "example.com/m/nm"
"github.com/astaxie/beego" "github.com/astaxie/beego"
_ "net/http/pprof"
) )
func main() { func main() {
log.InitLog(log.LogConfig{Path: "logs", Level: "debug", Save: 3}) log.InitLog(log.LogConfig{Path: "logs", Level: "debug", Save: 3})
//go func() {
// log.Println(http.ListenAndServe("localhost:6060", nil))
//}()
//runtime.SetBlockProfileRate(1) // 开启对阻塞操作的跟踪,block
//runtime.SetMutexProfileFraction(1) // 开启对锁调用的跟踪,mutex
nm.StartMonitor() nm.StartMonitor()
beego.Run() beego.Run()
} }
...@@ -32,7 +32,7 @@ type ModelInfo struct { ...@@ -32,7 +32,7 @@ type ModelInfo struct {
ImageName string `json:"image_name"` ImageName string `json:"image_name"`
DiskSize int64 `json:"disk_size"` DiskSize int64 `json:"disk_size"`
MemorySize int64 `json:"memory_size"` MemorySize int64 `json:"memory_size"`
IsImageExist bool `json:"is_image_delete"` IsImageExist bool
} }
type ComputeResult struct { type ComputeResult struct {
...@@ -48,6 +48,7 @@ type NodeManagerClient struct { ...@@ -48,6 +48,7 @@ type NodeManagerClient struct {
Endpoint string Endpoint string
Client nodeManagerV1.NodeManagerServiceClient Client nodeManagerV1.NodeManagerServiceClient
Status bool Status bool
IsDel bool
} }
func (n *NodeManagerClient) GetLastHeartTime() int64 { func (n *NodeManagerClient) GetLastHeartTime() int64 {
......
...@@ -25,16 +25,15 @@ var ( ...@@ -25,16 +25,15 @@ var (
isInit = true isInit = true
nodeManagerArr []*NodeManager nodeManagerArr []*NodeManager
usedNodeManagerClient []*models.NodeManagerClient usedNodeManagerClient []*models.NodeManagerClient
nodeManagerChan chan *models.NodeManagerClient nodeManagerClientChan chan *models.NodeManagerClient
nodeManagerMsgChan chan *nodeManagerV1.ManagerMessage nodeManagerMsgChan chan *nodeManagerV1.ManagerMessage
) )
func init() { func init() {
nodeManagerArr = make([]*NodeManager, 0) nodeManagerArr = make([]*NodeManager, 0)
usedNodeManagerClient = make([]*models.NodeManagerClient, 0) usedNodeManagerClient = make([]*models.NodeManagerClient, 0)
nodeManagerChan = make(chan *models.NodeManagerClient, 0) nodeManagerClientChan = make(chan *models.NodeManagerClient, 0)
nodeManagerMsgChan = make(chan *nodeManagerV1.ManagerMessage, 0) nodeManagerMsgChan = make(chan *nodeManagerV1.ManagerMessage, 1000)
} }
func StartMonitor() { func StartMonitor() {
...@@ -51,7 +50,6 @@ func StartMonitor() { ...@@ -51,7 +50,6 @@ func StartMonitor() {
go monitorModelInfo(dockerOp) go monitorModelInfo(dockerOp)
for isInit { for isInit {
} }
connectNodeManagerCount := 0 connectNodeManagerCount := 0
...@@ -63,7 +61,7 @@ func StartMonitor() { ...@@ -63,7 +61,7 @@ func StartMonitor() {
if int64(connectNodeManagerCount) == conf.GetConfig().NodeManagerNum { if int64(connectNodeManagerCount) == conf.GetConfig().NodeManagerNum {
break break
} }
isSuccess := inputNodeManagerChan(manager) isSuccess := inputNodeManagerChan(manager, nil)
if !isSuccess { if !isSuccess {
panic("Init input node manager chan failed") panic("Init input node manager chan failed")
} }
...@@ -82,15 +80,19 @@ func StartMonitor() { ...@@ -82,15 +80,19 @@ func StartMonitor() {
log.Warn("The managerClient is not exist:", managerClient.Endpoint) log.Warn("The managerClient is not exist:", managerClient.Endpoint)
continue continue
} }
// TODO: 重试连接三次 isSuccess := false
isSuccess := inputNodeManagerChan(manager) if !managerClient.IsDel {
// TODO: 重试连接三次
isSuccess = inputNodeManagerChan(manager, managerClient)
}
if !isSuccess { if !isSuccess {
managerClient.IsDel = true
unUsedNodeManager := getUnUsedNodeManager() unUsedNodeManager := getUnUsedNodeManager()
if unUsedNodeManager == nil || len(unUsedNodeManager) == 0 { if unUsedNodeManager == nil || len(unUsedNodeManager) == 0 {
break break
} }
for _, nodeManager := range unUsedNodeManager { for _, nodeManager := range unUsedNodeManager {
isSuccess := inputNodeManagerChan(nodeManager) isSuccess := inputNodeManagerChan(nodeManager, nil)
if !isSuccess { if !isSuccess {
break break
} }
...@@ -112,6 +114,15 @@ func getUnUsedNodeManager() []*NodeManager { ...@@ -112,6 +114,15 @@ func getUnUsedNodeManager() []*NodeManager {
return res return res
} }
func isExistNodeManager(endPoint string) bool {
for _, manager := range nodeManagerArr {
if endPoint == manager.Info.Endpoint {
return true
}
}
return false
}
func monitorNodeManagerSeed() { func monitorNodeManagerSeed() {
ticker := time.NewTicker(time.Second * 1) ticker := time.NewTicker(time.Second * 1)
for { for {
...@@ -131,12 +142,10 @@ func monitorNodeManagerSeed() { ...@@ -131,12 +142,10 @@ func monitorNodeManagerSeed() {
continue continue
} }
for _, node := range list.GetManagers() { for _, node := range list.GetManagers() {
nodeManager := &NodeManager{ if isExistNodeManager(node.Endpoint) {
Info: node, continue
IsUsed: false,
IsExist: true,
} }
nodeManagerArr = append(nodeManagerArr, nodeManager) nodeManagerArr = append(nodeManagerArr, &NodeManager{Info: node, IsUsed: false, IsExist: true})
} }
isInit = false isInit = false
ticker = time.NewTicker(time.Minute * 10) ticker = time.NewTicker(time.Minute * 10)
...@@ -170,11 +179,9 @@ func monitorWorker(op *operate.DockerOp) { ...@@ -170,11 +179,9 @@ func monitorWorker(op *operate.DockerOp) {
log.Info("Monitoring worker thread start......") log.Info("Monitoring worker thread start......")
for { for {
select { select {
case managerClient := <-nodeManagerChan: case managerClient := <-nodeManagerClientChan:
go func(nodeManager *models.NodeManagerClient) { go func(nodeManager *models.NodeManagerClient) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10000) worker, err := nodeManager.Client.RegisterWorker(context.Background(), grpc.EmptyCallOption{})
defer cancel()
worker, err := nodeManager.Client.RegisterWorker(ctx, grpc.EmptyCallOption{})
if err != nil { if err != nil {
log.Error("Registration worker failed", err) log.Error("Registration worker failed", err)
nodeManager.UpdateStatus(false) nodeManager.UpdateStatus(false)
...@@ -182,10 +189,12 @@ func monitorWorker(op *operate.DockerOp) { ...@@ -182,10 +189,12 @@ func monitorWorker(op *operate.DockerOp) {
} }
msgRespWorker := NewMsgRespWorker() msgRespWorker := NewMsgRespWorker()
go msgRespWorker.SendMsg() for i := 0; i < 3; i++ {
go msgRespWorker.SendMsg()
}
taskMsgWorker := NewTaskWorker(op) taskMsgWorker := NewTaskWorker(op)
taskMsgWorker.HandlerTask(3) taskMsgWorker.HandlerTask(4)
proofWorker := validator.NewProofWorker() proofWorker := validator.NewProofWorker()
...@@ -193,13 +202,13 @@ func monitorWorker(op *operate.DockerOp) { ...@@ -193,13 +202,13 @@ func monitorWorker(op *operate.DockerOp) {
go reportModelInfo(nodeManager, worker, msgRespWorker, op) go reportModelInfo(nodeManager, worker, msgRespWorker, op)
// 证明存储 // 证明存储
//go proofWorker.ProofStorage() go proofWorker.ProofStorage()
// 证明提交 // 证明提交
//go proofWorker.CommitWitness() //go proofWorker.CommitWitness()
// 处理其他消息 // 处理消息
for i := 0; i < 3; i++ { for i := 0; i < 5; i++ {
go handlerMsg(nodeManager, worker, msgRespWorker, taskMsgWorker, proofWorker) go handlerMsg(nodeManager, worker, msgRespWorker, taskMsgWorker, proofWorker)
} }
...@@ -224,7 +233,10 @@ func monitorWorker(op *operate.DockerOp) { ...@@ -224,7 +233,10 @@ func monitorWorker(op *operate.DockerOp) {
msgRespWorker.RegisterMsgResp(nodeManager, worker, GoodbyeResp, params) msgRespWorker.RegisterMsgResp(nodeManager, worker, GoodbyeResp, params)
return return
} }
log.Info("---------------------Received message---------------------")
nodeManagerMsgChan <- rev nodeManagerMsgChan <- rev
log.Info("---------------------Send Received message success---------------------")
continue
} }
}(managerClient) }(managerClient)
} }
...@@ -291,90 +303,99 @@ func handlerMsg(nodeManager *models.NodeManagerClient, ...@@ -291,90 +303,99 @@ func handlerMsg(nodeManager *models.NodeManagerClient,
for { for {
select { select {
case rev := <-nodeManagerMsgChan: case rev := <-nodeManagerMsgChan:
heartbeatReq := rev.GetHeartbeatRequest() {
if heartbeatReq != nil { heartbeatReq := rev.GetHeartbeatRequest()
nodeManager.UpdateLastHeartTime(int64(heartbeatReq.Timestamp)) if heartbeatReq != nil {
params := buildParams(heartbeatReq.Timestamp) nodeManager.UpdateLastHeartTime(int64(heartbeatReq.Timestamp))
msgRespWorker.RegisterMsgResp(nodeManager, worker, HeartbeatResp, params) params := buildParams(heartbeatReq.Timestamp)
log.Info(heartbeatReq) msgRespWorker.RegisterMsgResp(nodeManager, worker, HeartbeatResp, params)
} log.Info("-------------Heart beat req:-------------", heartbeatReq)
continue
}
taskMsg := rev.GetPushTaskMessage()
if taskMsg != nil {
go func(msgRespWorker *RespMsgWorker,
taskMsgWorker *TaskHandler, taskMsg *nodeManagerV1.PushTaskMessage) {
if !taskMsgWorker.DockerOp.IsHealthy {
params := buildParams(taskMsgWorker.DockerOp.Reason)
msgRespWorker.RegisterMsgResp(nodeManager, worker, GoodbyeResp, params)
return
}
taskMsgWorker.Wg.Add(1)
taskMsgWorker.TaskMsg <- taskMsg
taskMsgWorker.Wg.Wait()
taskResHeader := taskMsgWorker.TaskRespHeader[taskMsg.TaskUuid]
taskResBody := taskMsgWorker.TaskRespBody[taskMsg.TaskUuid]
isSuccess := taskMsgWorker.TaskIsSuccess[taskMsg.TaskUuid]
containerSign := taskMsgWorker.DockerOp.GetContainerSign(taskMsg, taskResBody)
if containerSign == nil || len(containerSign) == 0 {
log.Error("Container signing failed................")
isSuccess = false
}
reqHash, respHash, minerSign := taskMsgWorker.GetMinerSign(taskMsg, taskResBody)
params := buildParams(taskMsg.TaskUuid, containerSign, minerSign, taskResHeader, taskResBody, isSuccess)
taskMsgWorker.LruCache.Add(taskMsg.TaskUuid+models.ContainerSign, containerSign)
taskMsgWorker.LruCache.Add(taskMsg.TaskUuid+models.MinerSign, minerSign)
taskMsgWorker.LruCache.Add(taskMsg.TaskUuid+models.ReqHash, reqHash)
taskMsgWorker.LruCache.Add(taskMsg.TaskUuid+models.RespHash, respHash)
msgRespWorker.RegisterMsgResp(nodeManager, worker, SubmitResultResp, params)
log.Info("--------------taskMsg--------------:", taskMsg)
}(msgRespWorker, taskMsgWorker, taskMsg)
continue
}
nmSignMsg := rev.GetProofTaskResult()
if nmSignMsg != nil {
containerSign, ok := taskMsgWorker.LruCache.Get(nmSignMsg.TaskUuid + models.ContainerSign)
if !ok {
taskMsg := rev.GetPushTaskMessage()
if taskMsg != nil {
go func(msgRespWorker *RespMsgWorker,
taskMsgWorker *TaskHandler) {
if !taskMsgWorker.DockerOp.IsHealthy {
params := buildParams(taskMsgWorker.DockerOp.Reason)
msgRespWorker.RegisterMsgResp(nodeManager, worker, GoodbyeResp, params)
return
} }
taskMsgWorker.Wg.Add(1) minerSign, ok := taskMsgWorker.LruCache.Get(nmSignMsg.TaskUuid + models.MinerSign)
taskMsgWorker.TaskMsg <- taskMsg if !ok {
taskMsgWorker.Wg.Wait()
taskResHeader := taskMsgWorker.TaskRespHeader[taskMsg.TaskUuid]
taskResBody := taskMsgWorker.TaskRespBody[taskMsg.TaskUuid]
isSuccess := taskMsgWorker.TaskIsSuccess[taskMsg.TaskUuid]
containerSign := taskMsgWorker.DockerOp.GetContainerSign(taskMsg, taskResBody)
if containerSign == nil || len(containerSign) == 0 {
log.Error("Container signing failed................")
isSuccess = false
} }
reqHash, respHash, minerSign := taskMsgWorker.GetMinerSign(taskMsg, taskResBody) reqHash, ok := taskMsgWorker.LruCache.Get(nmSignMsg.TaskUuid + models.ReqHash)
params := buildParams(taskMsg.TaskUuid, containerSign, minerSign, taskResHeader, taskResBody, isSuccess) if !ok {
taskMsgWorker.LruCache.Add(taskMsg.TaskUuid+models.ContainerSign, containerSign)
taskMsgWorker.LruCache.Add(taskMsg.TaskUuid+models.MinerSign, minerSign)
taskMsgWorker.LruCache.Add(taskMsg.TaskUuid+models.ReqHash, reqHash)
taskMsgWorker.LruCache.Add(taskMsg.TaskUuid+models.RespHash, respHash)
msgRespWorker.RegisterMsgResp(nodeManager, worker, SubmitResultResp, params)
log.Info(taskMsg)
}(msgRespWorker, taskMsgWorker)
}
nmSignMsg := rev.GetProofTaskResult() }
if nmSignMsg != nil { respHash, ok := taskMsgWorker.LruCache.Get(nmSignMsg.TaskUuid + models.RespHash)
containerSign, ok := taskMsgWorker.LruCache.Get(nmSignMsg.TaskUuid + models.ContainerSign) if !ok {
if !ok {
}
proofWorker.ProductProof(nmSignMsg.TaskUuid, nmSignMsg.Workload, reqHash.([]byte), respHash.([]byte), containerSign.([]byte), minerSign.([]byte), nmSignMsg.ManagerSignature)
log.Info(nmSignMsg)
continue
} }
minerSign, ok := taskMsgWorker.LruCache.Get(nmSignMsg.TaskUuid + models.MinerSign)
if !ok {
deviceMsg := rev.GetDeviceRequest()
if deviceMsg != nil {
msgRespWorker.RegisterMsgResp(nodeManager, worker, DeviceInfoResp, nil)
log.Info(deviceMsg)
continue
} }
reqHash, ok := taskMsgWorker.LruCache.Get(nmSignMsg.TaskUuid + models.ReqHash)
if !ok {
deviceUsageMsg := rev.GetDeviceUsage()
if deviceUsageMsg != nil {
msgRespWorker.RegisterMsgResp(nodeManager, worker, DeviceUsageResp, nil)
log.Info(deviceUsageMsg)
continue
} }
respHash, ok := taskMsgWorker.LruCache.Get(nmSignMsg.TaskUuid + models.RespHash)
if !ok {
statusReqMsg := rev.GetStatusRequest()
if statusReqMsg != nil {
msgRespWorker.RegisterMsgResp(nodeManager, worker, StatusResp, nil)
log.Info(statusReqMsg)
continue
} }
proofWorker.ProductProof(nmSignMsg.TaskUuid, nmSignMsg.Workload, reqHash.([]byte), respHash.([]byte), containerSign.([]byte), minerSign.([]byte), nmSignMsg.ManagerSignature)
log.Info(nmSignMsg)
}
deviceMsg := rev.GetDeviceRequest() goodByeMsg := rev.GetGoodbyeMessage()
if deviceMsg != nil { if goodByeMsg != nil {
msgRespWorker.RegisterMsgResp(nodeManager, worker, DeviceInfoResp, nil) reason := goodByeMsg.GetReason()
log.Info(deviceMsg) log.Infof("Server endpoint:%s , good bye reason : %s", nodeManager.Endpoint, reason)
} nodeManager.UpdateStatus(false)
continue
deviceUsageMsg := rev.GetDeviceUsage() }
if deviceUsageMsg != nil {
msgRespWorker.RegisterMsgResp(nodeManager, worker, DeviceUsageResp, nil)
log.Info(deviceUsageMsg)
}
statusReqMsg := rev.GetStatusRequest()
if statusReqMsg != nil {
msgRespWorker.RegisterMsgResp(nodeManager, worker, StatusResp, nil)
log.Info(statusReqMsg)
}
goodByeMsg := rev.GetGoodbyeMessage()
if goodByeMsg != nil {
reason := goodByeMsg.GetReason()
log.Infof("Server endpoint:%s , good bye reason : %s", nodeManager.Endpoint, reason)
nodeManager.UpdateStatus(false)
} }
} }
} }
...@@ -388,20 +409,23 @@ func buildParams(params ...interface{}) []interface{} { ...@@ -388,20 +409,23 @@ func buildParams(params ...interface{}) []interface{} {
return res return res
} }
func inputNodeManagerChan(manager *NodeManager) bool { func inputNodeManagerChan(manager *NodeManager, nodeManagerClient *models.NodeManagerClient) bool {
n := &models.NodeManagerClient{ if nodeManagerClient == nil {
PublicKey: manager.Info.Publickey, nodeManagerClient = &models.NodeManagerClient{
Endpoint: manager.Info.Endpoint, PublicKey: manager.Info.Publickey,
Status: true, Endpoint: manager.Info.Endpoint,
LastHeartTime: time.Now().UnixMilli(), Status: true,
LastHeartTime: time.Now().UnixMilli(),
}
} }
serviceClient := operate.ConnNmGrpc(manager.Info.Endpoint) serviceClient := operate.ConnNmGrpc(manager.Info.Endpoint)
if serviceClient == nil { if serviceClient == nil {
return false return false
} }
n.Client = serviceClient nodeManagerClient.Status = true
nodeManagerChan <- n nodeManagerClient.Client = serviceClient
usedNodeManagerClient = append(usedNodeManagerClient, n) nodeManagerClientChan <- nodeManagerClient
usedNodeManagerClient = append(usedNodeManagerClient, nodeManagerClient)
manager.IsUsed = true manager.IsUsed = true
return true return true
} }
......
...@@ -93,19 +93,6 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) ...@@ -93,19 +93,6 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
log.Errorf("failed to unmarshal task cmd: %s", err.Error()) log.Errorf("failed to unmarshal task cmd: %s", err.Error())
return return
} }
var externalPort int64
for {
// 设置种子以确保每次运行时生成不同的随机数序列
rand.Seed(time.Now().UnixNano())
// 生成一个介于 0 和 100 之间的随机整数
externalPort = rand.Int63n(10001) + 10000
fmt.Println("t.DockerOp.UsedExternalPort[externalPort]:", t.DockerOp.UsedExternalPort[externalPort])
if t.DockerOp.UsedExternalPort[externalPort] {
continue
}
break
}
taskCmd.DockerCmd.HostPort = strconv.FormatInt(externalPort, 10)
images, err := t.DockerOp.PsImages() images, err := t.DockerOp.PsImages()
if err != nil { if err != nil {
...@@ -127,10 +114,8 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) ...@@ -127,10 +114,8 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
} }
log.Println(image.ID) log.Println(image.ID)
} }
containers := t.DockerOp.ListContainer() containers := t.DockerOp.ListContainer()
isImageRunExist := false isImageRunExist := false
for _, container := range containers { for _, container := range containers {
if container.ImageID == imageId { if container.ImageID == imageId {
taskCmd.ApiUrl = fmt.Sprintf(taskCmd.ApiUrl, container.Ports[0].PublicPort) taskCmd.ApiUrl = fmt.Sprintf(taskCmd.ApiUrl, container.Ports[0].PublicPort)
...@@ -139,6 +124,19 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) ...@@ -139,6 +124,19 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
} }
} }
if !isImageRunExist { if !isImageRunExist {
var externalPort int64
for {
// 设置种子以确保每次运行时生成不同的随机数序列
rand.Seed(time.Now().UnixNano())
// 生成一个介于 0 和 100 之间的随机整数
externalPort = rand.Int63n(10001) + 10000
log.Info("DockerOp UsedExternalPort :", t.DockerOp.UsedExternalPort[externalPort])
if t.DockerOp.UsedExternalPort[externalPort] {
continue
}
break
}
taskCmd.DockerCmd.HostPort = strconv.FormatInt(externalPort, 10)
taskCmd.ApiUrl = fmt.Sprintf(taskCmd.ApiUrl, externalPort) taskCmd.ApiUrl = fmt.Sprintf(taskCmd.ApiUrl, externalPort)
if int64(len(containers)) == conf.GetConfig().ContainerNum { if int64(len(containers)) == conf.GetConfig().ContainerNum {
//todo: 待定,需要根据权重去停止哪个容器 //todo: 待定,需要根据权重去停止哪个容器
......
...@@ -21,8 +21,8 @@ type ProofWorker struct { ...@@ -21,8 +21,8 @@ type ProofWorker struct {
func NewProofWorker() *ProofWorker { func NewProofWorker() *ProofWorker {
return &ProofWorker{ return &ProofWorker{
productProofChan: make(chan *witnessV1.Proof, 0), productProofChan: make(chan *witnessV1.Proof, 1000),
consumeProofChan: make(chan []*witnessV1.Proof, 0), consumeProofChan: make(chan []*witnessV1.Proof, 1000),
isCommitProof: make(map[string]bool, 0), isCommitProof: make(map[string]bool, 0),
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment