Commit cd5ce006 authored by duanjinfei's avatar duanjinfei

update nm handler

parent ea57f47b
package nm
package largeModel
import (
"encoding/json"
......@@ -6,20 +6,30 @@ import (
"example.com/m/log"
"example.com/m/models"
"example.com/m/operate"
nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"io"
"net/http"
"strings"
"time"
)
func monitorModelInfo(dockerOp *operate.DockerOp) {
client := &http.Client{}
type ModelHandler struct {
dockerOp *operate.DockerOp
client *http.Client
}
func NewModelHandler(dockerOp *operate.DockerOp) *ModelHandler {
return &ModelHandler{
dockerOp: dockerOp,
client: &http.Client{},
}
}
func (m *ModelHandler) MonitorModelInfo() {
ticker := time.NewTicker(time.Second * 10)
for {
select {
case <-ticker.C:
modelResp, err := client.Get(conf.GetConfig().ApiUrl)
modelResp, err := m.client.Get(conf.GetConfig().ApiUrl)
if err != nil {
log.Error("Error getting model info from client failed:", err)
continue
......@@ -44,7 +54,7 @@ func monitorModelInfo(dockerOp *operate.DockerOp) {
continue
}
modelInfosResp := resp.Data
imageNameMap, err := dockerOp.PsImageNameMap()
imageNameMap, err := m.dockerOp.PsImageNameMap()
if err != nil {
log.Error("Docker op ps images failed:", err)
continue
......@@ -65,7 +75,7 @@ func monitorModelInfo(dockerOp *operate.DockerOp) {
// todo: 如果够用
if isPull && modelInfo.PublishStatus == models.ModelPublishStatusYes {
log.WithField("model image name", modelInfo.ImageName).Info("pulling image")
go dockerOp.PullImage(modelInfo)
go m.dockerOp.PullImage(modelInfo)
}
} else {
log.WithField("name", modelInfo.ImageName).Info("The image name is already")
......@@ -74,33 +84,17 @@ func monitorModelInfo(dockerOp *operate.DockerOp) {
maxLong = modelInfo.TaskId
}
}
dockerOp.SignApi[modelInfo.ImageName] = modelInfo.SignUrl
m.dockerOp.SignApi[modelInfo.ImageName] = modelInfo.SignUrl
}
dockerOp.ModelsInfo = modelInfosResp
m.dockerOp.ModelsInfo = modelInfosResp
reportTaskIds = append(reportTaskIds, maxLong)
dockerOp.ReportTaskIds = reportTaskIds
dockerOp.ModelTaskIdIndexesChan <- reportTaskIds
m.dockerOp.ReportTaskIds = reportTaskIds
m.dockerOp.ModelTaskIdIndexesChan <- reportTaskIds
ticker = time.NewTicker(time.Minute * 10)
}
}
}
func reportModelInfo(nodeManager *models.NodeManagerClient,
worker nodeManagerV1.NodeManagerService_RegisterWorkerClient,
msgRespWorker *RespMsgWorker, dockerOp *operate.DockerOp) {
for {
select {
case taskIdIndexes := <-dockerOp.ModelTaskIdIndexesChan:
if !nodeManager.Status {
log.WithField("endpoint", nodeManager.Endpoint).Error("Node manager is down , stop report model info")
return
}
params := buildParams(taskIdIndexes)
msgRespWorker.RegisterMsgResp(nodeManager, worker, SubmitResourceMapRes, params)
}
}
}
func isResourceEnough(modelInfo *models.ModelInfo) bool {
return true
}
......@@ -10,6 +10,7 @@ const (
ContentType = "type"
RedirectCode = 303
UseFileCache = "USE-FILE-CACHE"
UseRedirect = "USE-REDIRECT"
Prefer = "Prefer"
Async = "respond-async"
HealthCheckAPI = "/health-check"
......
......@@ -64,7 +64,7 @@ func getNodeManager(endPoint string) *NodeManager {
return nil
}
func inputNodeManagerChan(manager *NodeManager, nodeManagerClient *models.NodeManagerClient, isSelect bool) bool {
func inputNodeManagerChan(manager *NodeManager, nodeManagerClient *models.NodeManagerClient, isSelect bool, monitorNm *MonitorNm) bool {
if nodeManagerClient == nil {
nodeManagerClient = &models.NodeManagerClient{
PublicKey: manager.Info.Publickey,
......@@ -82,7 +82,7 @@ func inputNodeManagerChan(manager *NodeManager, nodeManagerClient *models.NodeMa
}
nodeManagerClient.Status = true
nodeManagerClient.Client = serviceClient
nodeManagerClientChan <- nodeManagerClient
monitorNm.NodeManagerClientChan <- nodeManagerClient
manager.IsUsed = true
return true
}
package nm
import (
"context"
"example.com/m/conf"
"example.com/m/log"
"example.com/m/models"
"example.com/m/operate"
"example.com/m/validator"
nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"google.golang.org/grpc"
"time"
)
type MonitorNm struct {
NodeManagerClientChan chan *models.NodeManagerClient
NodeManagerMsgChan chan *nodeManagerV1.ManagerMessage
DockerOp *operate.DockerOp
IsInit bool
}
func NewMonitorNm(DockerOp *operate.DockerOp) *MonitorNm {
return &MonitorNm{
NodeManagerClientChan: make(chan *models.NodeManagerClient, 10),
NodeManagerMsgChan: make(chan *nodeManagerV1.ManagerMessage, 1000),
DockerOp: DockerOp,
IsInit: false,
}
}
func (m *MonitorNm) monitorNmClient() {
log.Info("Monitoring worker thread start......")
for {
select {
case managerClient := <-m.NodeManagerClientChan:
go func(nodeManager *models.NodeManagerClient) {
worker, err := nodeManager.Client.RegisterWorker(context.Background(), grpc.EmptyCallOption{})
if err != nil {
log.Error("Registration worker failed", err)
nodeManager.UpdateStatus(false)
log.Warn("Update nm status is false")
return
}
msgRespWorker := NewMsgRespWorker()
for i := 0; i < 2; i++ {
go msgRespWorker.SendMsg()
}
taskMsgWorker := NewTaskWorker(m.DockerOp)
taskMsgWorker.HandlerTask(4)
proofWorker := validator.NewProofWorker()
// 主动上报发送设备信息
go func(isSelect bool) {
ticker := time.NewTicker(time.Millisecond)
isSend := false
for {
select {
case <-ticker.C:
if isSend {
log.Info("The once-off message is send")
return
}
ticker = time.NewTicker(time.Second * 20)
msgRespWorker.RegisterMsgResp(nodeManager, worker, RegisterInfoResp, nil)
time.Sleep(time.Second * 2)
msgRespWorker.RegisterMsgResp(nodeManager, worker, DeviceInfoResp, nil)
if len(m.DockerOp.ReportTaskIds) == 0 {
m.DockerOp.ModelTaskIdIndexesChan <- []uint64{0}
} else {
m.DockerOp.ModelTaskIdIndexesChan <- m.DockerOp.ReportTaskIds
}
isSend = true
log.Info("------------------------Send once-off message ended------------------------")
}
}
}(nodeManager.IsSelected)
nodeManagerHandler := NewNodeManagerHandler(nodeManager, worker, msgRespWorker, taskMsgWorker)
// 上报image信息
go nodeManagerHandler.reportModelInfo(m.DockerOp)
log.Info("Report model info started")
// 证明存储
//go proofWorker.ProofStorage()
//log.Info("Proof storage worker started")
// 证明提交
//go proofWorker.CommitWitness()
//log.Info("Proof commit worker started")
go nodeManagerHandler.handlerStandardTask()
log.Info("Handler standard task worker started")
// 处理消息
for i := 0; i < 2; i++ {
go nodeManagerHandler.handlerMsg(m.NodeManagerMsgChan, proofWorker)
}
log.Info("------------------------Start rev msg worker thread------------------------")
for {
sub := time.Now().Sub(nodeManager.GetLastHeartTime()).Seconds()
log.WithField("time(uint seconds)", sub).Info("Handler nm msg thread monitor heartbeat time")
rev, err := worker.Recv()
if int64(sub) > conf.GetConfig().HeartRespTimeSecond || err != nil {
log.Error("Rev failed:", err)
//params := buildParams(fmt.Sprint("Rev failed:", err))
//msgRespWorker.RegisterMsgResp(nodeManager, worker, GoodbyeResp, params)
nodeManager.UpdateStatus(false)
log.Error("Node manager heartbeat is over")
return
}
log.Info("---------------------received message success---------------------")
m.NodeManagerMsgChan <- rev
log.Info("---------------------The message input channel success---------------------")
}
}(managerClient)
}
}
}
func (m *MonitorNm) monitorNodeManagerSeed() {
ticker := time.NewTicker(time.Second * 1)
for {
select {
case <-ticker.C:
seed := conf.GetConfig().NmSeed
log.Info("Nm seed url:", seed)
seedServiceClient := operate.ConnNmGrpc(seed)
if seedServiceClient == nil {
log.Warn("Connect nm seed service client is nil")
continue
}
list, err := seedServiceClient.ManagerList(context.Background(), &nodeManagerV1.ManagerListRequest{}, grpc.EmptyCallOption{})
if err != nil {
log.WithError(err).Warn("Get manager list failed through nm seed service")
continue
}
if list.GetManagers() == nil || len(list.GetManagers()) == 0 {
log.Warn("Get managers is empty through Nm seed service")
continue
}
for _, node := range list.GetManagers() {
if isExistNodeManager(node) {
log.Warn("Node manager is already exist and updated")
continue
}
nodeManagerArr = append(nodeManagerArr, &NodeManager{Info: node, IsUsed: false, IsExist: true})
}
m.IsInit = true
ticker = time.NewTicker(time.Minute * 10)
}
}
}
package nm
import (
"context"
"example.com/m/conf"
"example.com/m/log"
"example.com/m/operate"
nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"google.golang.org/grpc"
"time"
)
func monitorNodeManagerSeed() {
ticker := time.NewTicker(time.Second * 1)
for {
select {
case <-ticker.C:
seed := conf.GetConfig().NmSeed
log.Info("Nm seed url:", seed)
seedServiceClient := operate.ConnNmGrpc(seed)
if seedServiceClient == nil {
log.Warn("Connect nm seed service client is nil")
continue
}
list, err := seedServiceClient.ManagerList(context.Background(), &nodeManagerV1.ManagerListRequest{}, grpc.EmptyCallOption{})
if err != nil {
log.WithError(err).Warn("Get manager list failed through nm seed service")
continue
}
if list.GetManagers() == nil || len(list.GetManagers()) == 0 {
log.Warn("Get managers is empty through Nm seed service")
continue
}
for _, node := range list.GetManagers() {
if isExistNodeManager(node) {
log.Warn("Node manager is already exist and updated")
continue
}
nodeManagerArr = append(nodeManagerArr, &NodeManager{Info: node, IsUsed: false, IsExist: true})
}
isInit = true
ticker = time.NewTicker(time.Minute * 10)
}
}
}
package nm
import (
"example.com/m/conf"
"example.com/m/log"
"example.com/m/models"
"example.com/m/operate"
"example.com/m/utils"
"example.com/m/validator"
"fmt"
nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"time"
)
type NodeManagerHandler struct {
nodeManager *models.NodeManagerClient
worker nodeManagerV1.NodeManagerService_RegisterWorkerClient
msgRespWorker *RespMsgWorker
taskMsgWorker *TaskHandler
}
func NewNodeManagerHandler(nodeManager *models.NodeManagerClient, worker nodeManagerV1.NodeManagerService_RegisterWorkerClient, msgRespWorker *RespMsgWorker, taskMsgWorker *TaskHandler) *NodeManagerHandler {
return &NodeManagerHandler{
nodeManager: nodeManager,
worker: worker,
msgRespWorker: msgRespWorker,
taskMsgWorker: taskMsgWorker,
}
}
func (n *NodeManagerHandler) handlerMsg(nodeManagerMsgChan chan *nodeManagerV1.ManagerMessage, proofWorker *validator.ProofWorker) {
for {
select {
case rev := <-nodeManagerMsgChan:
{
if !n.nodeManager.Status {
log.Warn("handlerMsg -> node manager is not running")
return
}
heartbeatReq := rev.GetHeartbeatRequest()
if heartbeatReq != nil {
n.nodeManager.UpdateLastHeartTime(time.Now())
params := utils.BuildParams(heartbeatReq.Timestamp)
n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, HeartbeatResp, params)
log.Info("-------------Heart beat req:-------------", heartbeatReq)
continue
}
taskMsg := rev.GetPushTaskMessage()
if taskMsg != nil {
params := utils.BuildParams(taskMsg.TaskId)
n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, RespTaskAck, params)
go func(msgRespWorker *RespMsgWorker,
taskMsgWorker *TaskHandler, taskMsg *nodeManagerV1.PushTaskMessage) {
if !taskMsgWorker.DockerOp.IsHealthy {
//params := utils.BuildParams(taskMsgWorker.DockerOp.Reason)
//msgRespWorker.RegisterMsgResp(nodeManager, worker, GoodbyeResp, params)
return
}
taskMsgWorker.Wg.Add(1)
taskMsgWorker.TaskMsg <- taskMsg
taskMsgWorker.Wg.Wait()
taskExecResInterface, _ := taskMsgWorker.LruCache.Get(taskMsg.TaskId)
//log.WithField("result", taskExecResInterface).Info("lru cache get task result")
taskExecRes := &models.TaskResult{
TaskHttpStatusCode: 200,
TaskRespBody: nil,
TaskHttpHeaders: nil,
TaskIsSuccess: false,
TaskExecTime: 0,
TaskExecError: "",
}
if taskExecResInterface != nil {
taskExecRes = taskExecResInterface.(*models.TaskResult)
}
isSuccess := taskExecRes.TaskIsSuccess
containerSign := make([]byte, 0)
if taskExecRes.TaskRespBody != nil {
containerSign = taskMsgWorker.DockerOp.GetContainerSign(taskMsg, taskExecRes.TaskRespBody)
if containerSign == nil || len(containerSign) == 0 {
log.Error("Container signing failed................")
isSuccess = false
taskExecRes.TaskExecError = fmt.Sprintf("%s-%s", "Container sign failed", taskExecRes.TaskExecError)
}
} else {
isSuccess = false
taskExecRes.TaskExecError = fmt.Sprintf("worker:%s-%s-%s", conf.GetConfig().SignPublicAddress.Hex(), "Task exec error", taskExecRes.TaskExecError)
}
reqHash, respHash, minerSign := taskMsgWorker.GetMinerSign(taskMsg, taskExecRes.TaskRespBody)
params := utils.BuildParams(taskMsg.TaskId, containerSign, minerSign, taskExecRes, isSuccess)
taskMsgWorker.Mutex.Lock()
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.TaskType, taskMsg.TaskType)
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.ContainerSign, containerSign)
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.MinerSign, minerSign)
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.ReqHash, reqHash)
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.RespHash, respHash)
taskMsgWorker.Mutex.Unlock()
msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, SubmitResultResp, params)
log.Info("--------------taskMsg--------------:", taskMsg)
}(n.msgRespWorker, n.taskMsgWorker, taskMsg)
continue
}
nmResultMsg := rev.GetProofTaskResult()
if nmResultMsg != nil {
//containerSign, _ := taskMsgWorker.LruCache.Get(nmResultMsg.TaskId + models.ContainerSign)
//if !ok {
// log.Error("taskMsgWorker.LruCache.Get failed: ", nmSignMsg.TaskUuid+models.ContainerSign)
//}
//minerSign, _ := taskMsgWorker.LruCache.Get(nmResultMsg.TaskId + models.MinerSign)
//if !ok {
// log.Error("taskMsgWorker.LruCache.Get failed: ", nmSignMsg.TaskUuid+models.MinerSign)
//}
//reqHash, _ := taskMsgWorker.LruCache.Get(nmResultMsg.TaskId + models.ReqHash)
//if !ok {
// log.Error("taskMsgWorker.LruCache.Get failed: ", nmSignMsg.TaskUuid+models.ReqHash)
//}
//respHash, _ := taskMsgWorker.LruCache.Get(nmResultMsg.TaskId + models.RespHash)
//if !ok {
// log.Error("taskMsgWorker.LruCache.Get failed: ", nmSignMsg.TaskUuid+models.RespHash)
//}
//taskType, _ := taskMsgWorker.LruCache.Get(nmResultMsg.TaskId + models.TaskType)
//proofWorker.ProductProof(nmResultMsg, taskType.(uint64), reqHash.([]byte), respHash.([]byte), containerSign.([]byte), minerSign.([]byte))
log.WithField("proof", nmResultMsg).Info("Output proof task result")
continue
}
deviceUsageMsg := rev.GetDeviceUsage()
if deviceUsageMsg != nil {
n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, DeviceUsageResp, nil)
log.Info(deviceUsageMsg)
continue
}
nodeInfoMsg := rev.GetNodeInfoRequest()
if nodeInfoMsg != nil {
n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, NodeInfoResp, nil)
log.Info(nodeInfoMsg)
continue
}
statusReqMsg := rev.GetStatusRequest()
if statusReqMsg != nil {
n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, StatusResp, nil)
log.Info(statusReqMsg)
continue
}
goodByeMsg := rev.GetGoodbyeMessage()
if goodByeMsg != nil {
reason := goodByeMsg.GetReason()
log.Infof("Server endpoint:%s , good bye reason : %s", n.nodeManager.Endpoint, reason)
n.nodeManager.UpdateStatus(false)
log.Warn("Update nm status is false")
continue
}
}
}
}
}
func (n *NodeManagerHandler) handlerStandardTask() {
//ticker := time.NewTicker(time.Second * 30)
ticker := time.NewTicker(time.Minute * 5)
for {
select {
case <-ticker.C:
{
if n.taskMsgWorker.IsExecStandardTask {
continue
}
if !n.taskMsgWorker.IsExecAiTask {
n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, FetchStandardTaskResp, nil)
break
}
}
}
}
}
func (n *NodeManagerHandler) reportModelInfo(dockerOp *operate.DockerOp) {
for {
select {
case taskIdIndexes := <-dockerOp.ModelTaskIdIndexesChan:
if !n.nodeManager.Status {
log.WithField("endpoint", n.nodeManager.Endpoint).Error("Node manager is down , stop report model info")
return
}
params := utils.BuildParams(taskIdIndexes)
n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, SubmitResourceMapRes, params)
}
}
}
......@@ -211,23 +211,6 @@ func StatusResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
return statusRes
}
func GoodbyeResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
log.Info("Goodbye resp received params:", params)
reason := ""
if len(params) > 0 {
reason = params[0].(string)
}
goodbyeMsgRes := &nodemanagerV1.WorkerMessage{
Message: &nodemanagerV1.WorkerMessage_GoodbyeMessage{
GoodbyeMessage: &nodemanagerV1.GoodbyeMessage{
Reason: reason,
},
},
}
log.Info("---------------------------------------Send good bye msg ------------------------------------")
return goodbyeMsgRes
}
func SubmitResultResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
//log.Info("Handler task submit result resp received params:", params)
taskId := params[0].(string)
......@@ -280,3 +263,20 @@ func RespTaskAck(params ...interface{}) *nodemanagerV1.WorkerMessage {
log.WithField("taskId", taskId).Info("---------------------------------------Send task ack msg ------------------------------------")
return taskAckMsgRes
}
func GoodbyeResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
log.Info("Goodbye resp received params:", params)
reason := ""
if len(params) > 0 {
reason = params[0].(string)
}
goodbyeMsgRes := &nodemanagerV1.WorkerMessage{
Message: &nodemanagerV1.WorkerMessage_GoodbyeMessage{
GoodbyeMessage: &nodemanagerV1.GoodbyeMessage{
Reason: reason,
},
},
}
log.Info("---------------------------------------Send good bye msg ------------------------------------")
return goodbyeMsgRes
}
package nm
import (
"context"
"example.com/m/conf"
"example.com/m/largeModel"
"example.com/m/log"
"example.com/m/models"
"example.com/m/operate"
"example.com/m/utils"
"example.com/m/validator"
"fmt"
nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"google.golang.org/grpc"
"time"
)
// 指定远程 Docker 服务的地址
var (
isInit = false
nodeManagerArr []*NodeManager
usedNodeManagerClient []*models.NodeManagerClient
nodeManagerClientChan chan *models.NodeManagerClient
nodeManagerMsgChan chan *nodeManagerV1.ManagerMessage
)
func init() {
nodeManagerArr = make([]*NodeManager, 0)
usedNodeManagerClient = make([]*models.NodeManagerClient, 0)
nodeManagerClientChan = make(chan *models.NodeManagerClient, 10)
nodeManagerMsgChan = make(chan *nodeManagerV1.ManagerMessage, 1000)
}
func StartMonitor() {
......@@ -37,13 +28,17 @@ func StartMonitor() {
panic("Docker client is not healthy")
}
go monitorNodeManagerSeed()
modelHandler := largeModel.NewModelHandler(dockerOp)
go monitorWorker(dockerOp)
monitorNm := NewMonitorNm(dockerOp)
go monitorModelInfo(dockerOp)
go monitorNm.monitorNodeManagerSeed()
for !isInit {
go monitorNm.monitorNmClient()
go modelHandler.MonitorModelInfo()
for !monitorNm.IsInit {
}
var connectNodeManagerCount int64 = 0
......@@ -67,7 +62,7 @@ func StartMonitor() {
} else {
isSelect = false
}
isSuccess := inputNodeManagerChan(manager, nil, isSelect)
isSuccess := inputNodeManagerChan(manager, nil, isSelect, monitorNm)
if !isSuccess {
log.Warn("Init input node manager chan failed")
continue
......@@ -101,7 +96,7 @@ func StartMonitor() {
}
if !managerClient.IsDel {
// TODO: 重试连接三次
isSuccess := inputNodeManagerChan(manager, managerClient, managerClient.IsSelected)
isSuccess := inputNodeManagerChan(manager, managerClient, managerClient.IsSelected, monitorNm)
log.WithField("is success", isSuccess).Warn("Try to connect node manager client:", manager.Info.Endpoint)
if isSuccess {
log.Info("Connect node manager client success:", manager.Info.Endpoint)
......@@ -130,7 +125,7 @@ func StartMonitor() {
for i := 0; i < len(unUsedNodeManagers); i++ {
randomNum := utils.GenerateRandomNumber(conf.GetConfig().SignPrivateKey, int64(len(nodeManagerArr)))
unUsedManager := unUsedNodeManagers[randomNum.Int64()]
isSuccess := inputNodeManagerChan(unUsedManager, nil, false)
isSuccess := inputNodeManagerChan(unUsedManager, nil, false, monitorNm)
if !isSuccess {
log.Warn("Connect unused node manager client error:", manager.Info.Endpoint)
break
......@@ -141,262 +136,3 @@ func StartMonitor() {
}
}
}
// monitorWorker 监听worker
func monitorWorker(op *operate.DockerOp) {
log.Info("Monitoring worker thread start......")
for {
select {
case managerClient := <-nodeManagerClientChan:
go func(nodeManager *models.NodeManagerClient) {
worker, err := nodeManager.Client.RegisterWorker(context.Background(), grpc.EmptyCallOption{})
if err != nil {
log.Error("Registration worker failed", err)
nodeManager.UpdateStatus(false)
log.Warn("Update nm status is false")
return
}
msgRespWorker := NewMsgRespWorker()
for i := 0; i < 2; i++ {
go msgRespWorker.SendMsg()
}
taskMsgWorker := NewTaskWorker(op)
taskMsgWorker.HandlerTask(4)
proofWorker := validator.NewProofWorker()
// 主动上报发送设备信息
go func(isSelect bool) {
ticker := time.NewTicker(time.Millisecond)
isSend := false
for {
select {
case <-ticker.C:
if isSend {
log.Info("The once-off message is send")
return
}
ticker = time.NewTicker(time.Second * 20)
msgRespWorker.RegisterMsgResp(nodeManager, worker, RegisterInfoResp, nil)
time.Sleep(time.Second * 2)
msgRespWorker.RegisterMsgResp(nodeManager, worker, DeviceInfoResp, nil)
if len(op.ReportTaskIds) == 0 {
op.ModelTaskIdIndexesChan <- []uint64{0}
} else {
op.ModelTaskIdIndexesChan <- op.ReportTaskIds
}
isSend = true
log.Info("------------------------Send once-off message ended------------------------")
}
}
}(nodeManager.IsSelected)
// 上报image信息
go reportModelInfo(nodeManager, worker, msgRespWorker, op)
log.Info("Report model info started")
// 证明存储
//go proofWorker.ProofStorage()
//log.Info("Proof storage worker started")
// 证明提交
//go proofWorker.CommitWitness()
//log.Info("Proof commit worker started")
go handlerStandardTask(nodeManager, worker, msgRespWorker, taskMsgWorker)
log.Info("Handler standard task worker started")
// 处理消息
for i := 0; i < 2; i++ {
go handlerMsg(nodeManager, worker, msgRespWorker, taskMsgWorker, proofWorker)
}
log.Info("------------------------Start rev msg worker thread------------------------")
for {
sub := time.Now().Sub(nodeManager.GetLastHeartTime()).Seconds()
log.WithField("time(uint seconds)", sub).Info("Handler nm msg thread monitor heartbeat time")
rev, err := worker.Recv()
if int64(sub) > conf.GetConfig().HeartRespTimeSecond || err != nil {
log.Error("Rev failed:", err)
//params := buildParams(fmt.Sprint("Rev failed:", err))
//msgRespWorker.RegisterMsgResp(nodeManager, worker, GoodbyeResp, params)
nodeManager.UpdateStatus(false)
log.Error("Node manager heartbeat is over")
return
}
log.Info("---------------------received message success---------------------")
nodeManagerMsgChan <- rev
log.Info("---------------------The message input channel success---------------------")
}
}(managerClient)
}
}
}
func handlerStandardTask(nodeManager *models.NodeManagerClient,
worker nodeManagerV1.NodeManagerService_RegisterWorkerClient,
msgRespWorker *RespMsgWorker,
taskMsgWorker *TaskHandler) {
//ticker := time.NewTicker(time.Second * 30)
ticker := time.NewTicker(time.Minute * 5)
for {
select {
case <-ticker.C:
{
if taskMsgWorker.IsExecStandardTask {
continue
}
if !taskMsgWorker.IsExecAiTask {
msgRespWorker.RegisterMsgResp(nodeManager, worker, FetchStandardTaskResp, nil)
break
}
}
}
}
}
// handlerMsg 通过 goroutine 处理Msg
func handlerMsg(nodeManager *models.NodeManagerClient,
worker nodeManagerV1.NodeManagerService_RegisterWorkerClient,
msgRespWorker *RespMsgWorker,
taskMsgWorker *TaskHandler,
proofWorker *validator.ProofWorker) {
for {
select {
case rev := <-nodeManagerMsgChan:
{
if !nodeManager.Status {
log.Warn("handlerMsg -> node manager is not running")
return
}
heartbeatReq := rev.GetHeartbeatRequest()
if heartbeatReq != nil {
nodeManager.UpdateLastHeartTime(time.Now())
params := buildParams(heartbeatReq.Timestamp)
msgRespWorker.RegisterMsgResp(nodeManager, worker, HeartbeatResp, params)
log.Info("-------------Heart beat req:-------------", heartbeatReq)
continue
}
taskMsg := rev.GetPushTaskMessage()
if taskMsg != nil {
params := buildParams(taskMsg.TaskId)
msgRespWorker.RegisterMsgResp(nodeManager, worker, RespTaskAck, params)
go func(msgRespWorker *RespMsgWorker,
taskMsgWorker *TaskHandler, taskMsg *nodeManagerV1.PushTaskMessage) {
if !taskMsgWorker.DockerOp.IsHealthy {
//params := buildParams(taskMsgWorker.DockerOp.Reason)
//msgRespWorker.RegisterMsgResp(nodeManager, worker, GoodbyeResp, params)
return
}
taskMsgWorker.Wg.Add(1)
taskMsgWorker.TaskMsg <- taskMsg
taskMsgWorker.Wg.Wait()
taskExecResInterface, _ := taskMsgWorker.LruCache.Get(taskMsg.TaskId)
//log.WithField("result", taskExecResInterface).Info("lru cache get task result")
taskExecRes := &models.TaskResult{
TaskHttpStatusCode: 200,
TaskRespBody: nil,
TaskHttpHeaders: nil,
TaskIsSuccess: false,
TaskExecTime: 0,
TaskExecError: "",
}
if taskExecResInterface != nil {
taskExecRes = taskExecResInterface.(*models.TaskResult)
}
isSuccess := taskExecRes.TaskIsSuccess
containerSign := make([]byte, 0)
if taskExecRes.TaskRespBody != nil {
containerSign = taskMsgWorker.DockerOp.GetContainerSign(taskMsg, taskExecRes.TaskRespBody)
if containerSign == nil || len(containerSign) == 0 {
log.Error("Container signing failed................")
isSuccess = false
taskExecRes.TaskExecError = fmt.Sprintf("%s-%s", "Container sign failed", taskExecRes.TaskExecError)
}
} else {
isSuccess = false
taskExecRes.TaskExecError = fmt.Sprintf("worker:%s-%s-%s", conf.GetConfig().SignPublicAddress.Hex(), "Task exec error", taskExecRes.TaskExecError)
}
reqHash, respHash, minerSign := taskMsgWorker.GetMinerSign(taskMsg, taskExecRes.TaskRespBody)
params := buildParams(taskMsg.TaskId, containerSign, minerSign, taskExecRes, isSuccess)
taskMsgWorker.Mutex.Lock()
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.TaskType, taskMsg.TaskType)
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.ContainerSign, containerSign)
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.MinerSign, minerSign)
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.ReqHash, reqHash)
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.RespHash, respHash)
taskMsgWorker.Mutex.Unlock()
msgRespWorker.RegisterMsgResp(nodeManager, worker, SubmitResultResp, params)
log.Info("--------------taskMsg--------------:", taskMsg)
}(msgRespWorker, taskMsgWorker, taskMsg)
continue
}
nmResultMsg := rev.GetProofTaskResult()
if nmResultMsg != nil {
//containerSign, _ := taskMsgWorker.LruCache.Get(nmResultMsg.TaskId + models.ContainerSign)
//if !ok {
// log.Error("taskMsgWorker.LruCache.Get failed: ", nmSignMsg.TaskUuid+models.ContainerSign)
//}
//minerSign, _ := taskMsgWorker.LruCache.Get(nmResultMsg.TaskId + models.MinerSign)
//if !ok {
// log.Error("taskMsgWorker.LruCache.Get failed: ", nmSignMsg.TaskUuid+models.MinerSign)
//}
//reqHash, _ := taskMsgWorker.LruCache.Get(nmResultMsg.TaskId + models.ReqHash)
//if !ok {
// log.Error("taskMsgWorker.LruCache.Get failed: ", nmSignMsg.TaskUuid+models.ReqHash)
//}
//respHash, _ := taskMsgWorker.LruCache.Get(nmResultMsg.TaskId + models.RespHash)
//if !ok {
// log.Error("taskMsgWorker.LruCache.Get failed: ", nmSignMsg.TaskUuid+models.RespHash)
//}
//taskType, _ := taskMsgWorker.LruCache.Get(nmResultMsg.TaskId + models.TaskType)
//proofWorker.ProductProof(nmResultMsg, taskType.(uint64), reqHash.([]byte), respHash.([]byte), containerSign.([]byte), minerSign.([]byte))
log.WithField("proof", nmResultMsg).Info("Output proof task result")
continue
}
deviceUsageMsg := rev.GetDeviceUsage()
if deviceUsageMsg != nil {
msgRespWorker.RegisterMsgResp(nodeManager, worker, DeviceUsageResp, nil)
log.Info(deviceUsageMsg)
continue
}
nodeInfoMsg := rev.GetNodeInfoRequest()
if nodeInfoMsg != nil {
msgRespWorker.RegisterMsgResp(nodeManager, worker, NodeInfoResp, nil)
log.Info(nodeInfoMsg)
continue
}
statusReqMsg := rev.GetStatusRequest()
if statusReqMsg != nil {
msgRespWorker.RegisterMsgResp(nodeManager, worker, StatusResp, nil)
log.Info(statusReqMsg)
continue
}
goodByeMsg := rev.GetGoodbyeMessage()
if goodByeMsg != nil {
reason := goodByeMsg.GetReason()
log.Infof("Server endpoint:%s , good bye reason : %s", nodeManager.Endpoint, reason)
nodeManager.UpdateStatus(false)
log.Warn("Update nm status is false")
continue
}
}
}
}
}
func buildParams(params ...interface{}) []interface{} {
res := make([]interface{}, len(params))
for i, param := range params {
res[i] = param
}
return res
}
......@@ -36,10 +36,10 @@ type TaskHandler struct {
IsExecAiTask bool
IsExecStandardTask bool
ExecTaskIdIsSuccess *sync.Map
oldTaskImageName string
oldTaskId string
}
var oldTaskImageName, oldTaskId string
func NewTaskWorker(op *operate.DockerOp) *TaskHandler {
return &TaskHandler{
Wg: &sync.WaitGroup{},
......@@ -86,6 +86,19 @@ func (t *TaskHandler) HandlerTask(runCount int) {
}
}
func (t *TaskHandler) GetMinerSign(msg *nodeManagerV1.PushTaskMessage, taskResult []byte) ([]byte, []byte, []byte) {
reqHash := crypto.Keccak256Hash(msg.TaskParam)
respHash := crypto.Keccak256Hash(taskResult)
signHash := crypto.Keccak256Hash(bytes.NewBufferString(msg.TaskId).Bytes(), reqHash.Bytes(), respHash.Bytes())
log.WithField("hash", signHash.String()).Info("Miner sign result")
sign, err := crypto.Sign(signHash.Bytes(), conf.GetConfig().SignPrivateKey)
if err != nil {
log.Error("custom task handler")
}
log.Info("Miner sign:", common.Bytes2Hex(sign))
return reqHash.Bytes(), respHash.Bytes(), sign
}
func (t *TaskHandler) SystemTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) {
defer t.Wg.Done()
......@@ -114,7 +127,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
return
}
log.Info("received task cmd :", taskCmd)
log.WithField("oldTaskImageName", oldTaskImageName).WithField("newTaskImageName", taskCmd.ImageName).Info("task image info")
log.WithField("t.oldTaskImageName", t.oldTaskImageName).WithField("newTaskImageName", taskCmd.ImageName).Info("task image info")
if taskMsg.TaskKind != baseV1.TaskKind_StandardTask {
t.checkIsStopContainer(taskCmd)
}
......@@ -240,21 +253,28 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
isUseFileCache := true
if taskMsg.TaskKind != baseV1.TaskKind_StandardTask {
isUseFileCache := true
isUseRedirect := false
for key, value := range taskParam.Headers {
if key == models.UseRedirect {
if value[0] == "true" {
isUseRedirect = true
}
}
if key == models.UseFileCache {
if value[0] == "0" {
if value[0] == "false" {
isUseFileCache = false
break
}
}
}
log.WithField("isUseRedirect", isUseRedirect).Info("is use redirect")
log.WithField("isUseFileCache", isUseFileCache).Info("is use file cache")
if readBody != nil {
data := parseData(readBody)
if data != nil {
isRedirect := false
isSuccess := false
switch v := data.(type) {
case [][]string:
......@@ -284,12 +304,11 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
}
continue
}
if ossUri != "" && len(res) == 1 && len(innerSlice) == 1 {
if isUseRedirect && ossUri != "" && len(res) == 1 && len(innerSlice) == 1 {
taskExecResult.TaskHttpStatusCode = models.RedirectCode
apiResBody := utils.EncodeJsonEscapeHTML(ossUri)
taskExecResult.TaskRespBody = apiResBody
post.Header.Set("Location", ossUri)
isRedirect = true
isSuccess = true
break
} else {
......@@ -304,7 +323,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
if !isRedirect {
if !isUseRedirect {
apiResBody := utils.EncodeJsonEscapeHTML(apiRes)
taskExecResult.TaskRespBody = apiResBody
}
......@@ -334,12 +353,11 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
}
continue
}
if ossUri != "" && len(res) == 1 {
if isUseRedirect && ossUri != "" && len(res) == 1 {
taskExecResult.TaskHttpStatusCode = models.RedirectCode
post.Header.Set("Location", ossUri)
apiResBody := utils.EncodeJsonEscapeHTML(ossUri)
taskExecResult.TaskRespBody = apiResBody
isRedirect = true
isSuccess = true
break
} else {
......@@ -352,7 +370,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
if !isRedirect {
if !isUseRedirect {
apiResBody := utils.EncodeJsonEscapeHTML(apiRes)
taskExecResult.TaskRespBody = apiResBody
}
......@@ -383,12 +401,11 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
}
continue
}
if ossUri != "" && len(resArr) == 1 {
if isUseRedirect && ossUri != "" && len(resArr) == 1 {
taskExecResult.TaskHttpStatusCode = models.RedirectCode
post.Header.Set("Location", ossUri)
apiResBody := utils.EncodeJsonEscapeHTML(ossUri)
taskExecResult.TaskRespBody = apiResBody
isRedirect = true
isSuccess = true
break
} else {
......@@ -401,7 +418,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
if !isRedirect {
if !isUseRedirect {
apiResBody := utils.EncodeJsonEscapeHTML(apiRes)
taskExecResult.TaskRespBody = apiResBody
}
......@@ -465,6 +482,16 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
log.Info("received computeTask--------------------------------")
}
func (t *TaskHandler) CustomTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) {
defer t.Wg.Done()
_, err := t.DockerOp.PsImages()
if err != nil {
log.Error("custom task handler docker op ps images failed: ", err)
return
}
log.Info("received customTask--------------------------------")
}
func (t *TaskHandler) foundTaskImage(taskExecResult *models.TaskResult, taskCmd *models.TaskCmd, taskMsg *nodeManagerV1.PushTaskMessage) (isSuccess bool, imageId string) {
images, err := t.DockerOp.PsImages()
if err != nil {
......@@ -493,29 +520,6 @@ func (t *TaskHandler) foundTaskImage(taskExecResult *models.TaskResult, taskCmd
return
}
func (t *TaskHandler) CustomTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) {
defer t.Wg.Done()
_, err := t.DockerOp.PsImages()
if err != nil {
log.Error("custom task handler docker op ps images failed: ", err)
return
}
log.Info("received customTask--------------------------------")
}
func (t *TaskHandler) GetMinerSign(msg *nodeManagerV1.PushTaskMessage, taskResult []byte) ([]byte, []byte, []byte) {
reqHash := crypto.Keccak256Hash(msg.TaskParam)
respHash := crypto.Keccak256Hash(taskResult)
signHash := crypto.Keccak256Hash(bytes.NewBufferString(msg.TaskId).Bytes(), reqHash.Bytes(), respHash.Bytes())
log.WithField("hash", signHash.String()).Info("Miner sign result")
sign, err := crypto.Sign(signHash.Bytes(), conf.GetConfig().SignPrivateKey)
if err != nil {
log.Error("custom task handler")
}
log.Info("Miner sign:", common.Bytes2Hex(sign))
return reqHash.Bytes(), respHash.Bytes(), sign
}
func (t *TaskHandler) foundImageIsRunning(imageId string) (bool, string, uint16) {
containers := t.DockerOp.ListContainer()
for _, container := range containers {
......@@ -621,29 +625,29 @@ func (t *TaskHandler) checkLastTaskExecStatus(taskMsg *nodeManagerV1.PushTaskMes
} else if taskMsg.TaskKind == baseV1.TaskKind_StandardTask {
t.IsExecStandardTask = true
}
if oldTaskId != taskMsg.TaskId {
if t.oldTaskId != taskMsg.TaskId {
now := time.Now()
for {
since := time.Since(now)
if int64(since.Seconds()) > conf.GetConfig().WaitLastTaskExecTime {
log.WithField("taskId", oldTaskId).Info("Waiting for last task execution ending")
oldTaskId = taskMsg.TaskId
log.WithField("taskId", t.oldTaskId).Info("Waiting for last task execution ending")
t.oldTaskId = taskMsg.TaskId
break
}
if oldTaskId == "" {
oldTaskId = taskMsg.TaskId
if t.oldTaskId == "" {
t.oldTaskId = taskMsg.TaskId
break
}
value, ok := t.ExecTaskIdIsSuccess.Load(oldTaskId)
value, ok := t.ExecTaskIdIsSuccess.Load(t.oldTaskId)
//log.WithField("isSuccess", value).Info("Task id exec info")
if !ok {
//log.WithField("task id", oldTaskId).Warn("task exec is not finished")
//log.WithField("task id", t.oldTaskId).Warn("task exec is not finished")
continue
}
isSuccess := value.(bool)
if isSuccess {
oldTaskId = taskMsg.TaskId
log.WithField("taskId", oldTaskId).Info("Task exec success")
t.oldTaskId = taskMsg.TaskId
log.WithField("taskId", t.oldTaskId).Info("Task exec success")
break
}
}
......@@ -681,7 +685,7 @@ func (t *TaskHandler) checkContainerHealthy(internalIp string, internalPort uint
}
func (t *TaskHandler) checkIsStopContainer(taskCmd *models.TaskCmd) {
if oldTaskImageName != "" && oldTaskImageName != taskCmd.ImageName {
if t.oldTaskImageName != "" && t.oldTaskImageName != taskCmd.ImageName {
//todo: 停止标准任务容器
containers := t.DockerOp.ListContainer()
for _, container := range containers {
......@@ -689,17 +693,17 @@ func (t *TaskHandler) checkIsStopContainer(taskCmd *models.TaskCmd) {
if len(split) == 1 {
container.Image = fmt.Sprintf("%s:%s", container.Image, "latest")
}
log.WithField("containerImageName", container.Image).WithField("oldTaskImageName", oldTaskImageName).Info("match image")
if container.Image == oldTaskImageName && container.State == "running" {
log.WithField("containerImageName", container.Image).WithField("t.oldTaskImageName", t.oldTaskImageName).Info("match image")
if container.Image == t.oldTaskImageName && container.State == "running" {
t.DockerOp.StopContainer(container.ID)
log.WithField("Image name", container.Image).Info("Stopping container")
//t.DockerOp.RunningImages[oldTaskImageName] = false
//t.DockerOp.RunningImages[t.oldTaskImageName] = false
break
}
}
oldTaskImageName = taskCmd.ImageName
t.oldTaskImageName = taskCmd.ImageName
} else {
oldTaskImageName = taskCmd.ImageName
t.oldTaskImageName = taskCmd.ImageName
}
}
......
......@@ -167,3 +167,11 @@ func EncodeJsonEscapeHTML(apiRes any) []byte {
//log.WithField("apiResBody", string(apiResBody.Bytes())).Info("model resp")
return apiResBody.Bytes()
}
func BuildParams(params ...interface{}) []interface{} {
res := make([]interface{}, len(params))
for i, param := range params {
res[i] = param
}
return res
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment