Commit cef4b011 authored by duanjinfei's avatar duanjinfei

add resp header byte

parent 86aaab34
......@@ -23,10 +23,6 @@ type TaskReq struct {
TaskResult []byte `json:"task_result"`
}
type ContainerSignStruct struct {
Sign []byte `json:"sign"`
}
type ModelInfo struct {
TaskId uint64 `json:"task_id"`
User string `json:"user"`
......
......@@ -190,15 +190,17 @@ func SubmitResultResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
taskId := params[0].(string)
containerSign := params[1].([]byte)
minerSign := params[2].([]byte)
taskResult := params[3].([]byte)
isSuccess := params[4].(bool)
taskResultHeader := params[3].([]byte)
taskResultBody := params[4].([]byte)
isSuccess := params[5].(bool)
submitResultMsgRes := &nodemanagerV1.WorkerMessage{
Message: &nodemanagerV1.WorkerMessage_SubmitTaskResult{
SubmitTaskResult: &nodemanagerV1.SubmitTaskResult{
TaskUuid: taskId,
ContainerSignature: containerSign,
MinerSignature: minerSign,
TaskResult: taskResult,
TaskResultHeader: taskResultHeader,
TaskResultBody: taskResultBody,
IsSuccessed: isSuccess,
},
},
......
......@@ -246,15 +246,16 @@ func handlerMsg(nodeManager *models.NodeManagerClient,
taskMsgWorker.Wg.Add(1)
taskMsgWorker.TaskMsg <- taskMsg
taskMsgWorker.Wg.Wait()
taskResBytes := taskMsgWorker.TaskResp[taskMsg.TaskUuid]
taskResHeader := taskMsgWorker.TaskRespHeader[taskMsg.TaskUuid]
taskResBody := taskMsgWorker.TaskRespBody[taskMsg.TaskUuid]
isSuccess := taskMsgWorker.TaskIsSuccess[taskMsg.TaskUuid]
containerSign := taskMsgWorker.DockerOp.GetContainerSign(taskMsg, taskResBytes)
containerSign := taskMsgWorker.DockerOp.GetContainerSign(taskMsg, taskResBody)
if containerSign == nil || len(containerSign) == 0 {
log.Error("Container signing failed................")
isSuccess = false
}
reqHash, respHash, minerSign := taskMsgWorker.GetMinerSign(taskMsg, taskResBytes)
params := buildParams(taskMsg.TaskUuid, containerSign, minerSign, taskResBytes, isSuccess)
reqHash, respHash, minerSign := taskMsgWorker.GetMinerSign(taskMsg, taskResBody)
params := buildParams(taskMsg.TaskUuid, containerSign, minerSign, taskResHeader, taskResBody, isSuccess)
taskMsgWorker.LruCache.Add(taskMsg.TaskUuid+models.ContainerSign, containerSign)
taskMsgWorker.LruCache.Add(taskMsg.TaskUuid+models.MinerSign, minerSign)
taskMsgWorker.LruCache.Add(taskMsg.TaskUuid+models.ReqHash, reqHash)
......
......@@ -22,25 +22,27 @@ import (
)
type TaskHandler struct {
Wg *sync.WaitGroup
LruCache *lru.Cache
DockerOp *operate.DockerOp
CmdOp *operate.Command
TaskMsg chan *nodeManagerV1.PushTaskMessage
TaskResp map[string][]byte
TaskIsSuccess map[string]bool
HttpClient *http.Client
Wg *sync.WaitGroup
LruCache *lru.Cache
DockerOp *operate.DockerOp
CmdOp *operate.Command
TaskMsg chan *nodeManagerV1.PushTaskMessage
TaskRespHeader map[string][]byte
TaskRespBody map[string][]byte
TaskIsSuccess map[string]bool
HttpClient *http.Client
}
func NewTaskWorker(op *operate.DockerOp) *TaskHandler {
return &TaskHandler{
Wg: &sync.WaitGroup{},
LruCache: lru.New(100),
DockerOp: op,
TaskMsg: make(chan *nodeManagerV1.PushTaskMessage, 0),
TaskResp: make(map[string][]byte, 0),
TaskIsSuccess: make(map[string]bool, 0),
HttpClient: &http.Client{},
Wg: &sync.WaitGroup{},
LruCache: lru.New(100),
DockerOp: op,
TaskMsg: make(chan *nodeManagerV1.PushTaskMessage, 0),
TaskRespHeader: make(map[string][]byte, 0),
TaskRespBody: make(map[string][]byte, 0),
TaskIsSuccess: make(map[string]bool, 0),
HttpClient: &http.Client{},
}
}
......@@ -81,7 +83,8 @@ func (t *TaskHandler) SystemTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) {
defer t.Wg.Done()
t.TaskResp[taskMsg.TaskUuid] = nil
t.TaskRespBody[taskMsg.TaskUuid] = nil
t.TaskRespHeader[taskMsg.TaskUuid] = nil
t.TaskIsSuccess[taskMsg.TaskUuid] = false
reader := bytes.NewReader(taskMsg.TaskParam)
taskCmd := &models.TaskCmd{}
......@@ -157,22 +160,19 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
return
}
if post.StatusCode == http.StatusOK {
readBody, err := io.ReadAll(post.Body)
headers, err := json.Marshal(post.Header)
if err != nil {
log.Error("received error: ", err)
log.Error("JSON marshal header error: ", err)
return
}
res := &models.ComputeResult{}
err = json.Unmarshal(readBody, res)
readBody, err := io.ReadAll(post.Body)
if err != nil {
log.Error("received error: ", err)
return
}
if res.Code == "200" {
log.Info(string(readBody))
t.TaskResp[taskMsg.TaskUuid] = readBody
t.TaskIsSuccess[taskMsg.TaskUuid] = true
}
t.TaskRespHeader[taskMsg.TaskUuid] = headers
t.TaskRespBody[taskMsg.TaskUuid] = readBody
t.TaskIsSuccess[taskMsg.TaskUuid] = true
}
log.Info("received computeTask--------------------------------")
}
......
#!/bin/bash
rm -rf node node.log
echo "rm data successful"
go get
echo "go get successful"
go build -o node
nohup ./node > node.log 2>&1 &
\ No newline at end of file
echo "build successful"
nohup ./node > node.log 2>&1 &
echo "running successful"
\ No newline at end of file
......@@ -30,9 +30,9 @@ func TestJson(t *testing.T) {
taskCmd := &TaskCmd{
ImageName: "onlydd/llm-server:0119",
DockerCmd: &DockerCmd{
ContainerPort: "80",
ContainerPort: "8888",
},
ApiUrl: "https://192.168.1.120:5001/aigic",
ApiUrl: "https://192.168.1.120:8888/llm/test/get/sign",
}
marshal, err := json.Marshal(taskCmd)
......@@ -46,14 +46,15 @@ func TestJson(t *testing.T) {
func TestTaskHandler_computeTaskHandler(t1 *testing.T) {
type fields struct {
wg *sync.WaitGroup
lruCache *lru.Cache
DockerOp *operate.DockerOp
CmdOp *operate.Command
TaskMsg chan *nodeManagerV1.PushTaskMessage
TaskResp map[string][]byte
TaskIsSuccess map[string]bool
HttpClient *http.Client
wg *sync.WaitGroup
lruCache *lru.Cache
DockerOp *operate.DockerOp
CmdOp *operate.Command
TaskMsg chan *nodeManagerV1.PushTaskMessage
TaskRespHeader map[string][]byte
TaskRespBody map[string][]byte
TaskIsSuccess map[string]bool
HttpClient *http.Client
}
type args struct {
......@@ -101,13 +102,14 @@ func TestTaskHandler_computeTaskHandler(t1 *testing.T) {
{
"test send task",
fields{
wg: &sync.WaitGroup{},
lruCache: lru.New(100),
DockerOp: operate.NewDockerOp(),
TaskMsg: make(chan *nodeManagerV1.PushTaskMessage, 0),
TaskResp: make(map[string][]byte, 0),
TaskIsSuccess: make(map[string]bool, 0),
HttpClient: &http.Client{},
wg: &sync.WaitGroup{},
lruCache: lru.New(100),
DockerOp: operate.NewDockerOp(),
TaskMsg: make(chan *nodeManagerV1.PushTaskMessage, 0),
TaskRespHeader: make(map[string][]byte, 0),
TaskRespBody: make(map[string][]byte, 0),
TaskIsSuccess: make(map[string]bool, 0),
HttpClient: &http.Client{},
},
n,
},
......@@ -115,13 +117,14 @@ func TestTaskHandler_computeTaskHandler(t1 *testing.T) {
for _, tt := range tests {
t1.Run(tt.name, func(t1 *testing.T) {
t := &nm.TaskHandler{
Wg: tt.fields.wg,
LruCache: tt.fields.lruCache,
DockerOp: tt.fields.DockerOp,
TaskMsg: tt.fields.TaskMsg,
TaskResp: tt.fields.TaskResp,
TaskIsSuccess: tt.fields.TaskIsSuccess,
HttpClient: tt.fields.HttpClient,
Wg: tt.fields.wg,
LruCache: tt.fields.lruCache,
DockerOp: tt.fields.DockerOp,
TaskMsg: tt.fields.TaskMsg,
TaskRespHeader: tt.fields.TaskRespHeader,
TaskRespBody: tt.fields.TaskRespBody,
TaskIsSuccess: tt.fields.TaskIsSuccess,
HttpClient: tt.fields.HttpClient,
}
tt.fields.wg.Add(1)
t.ComputeTaskHandler(tt.args.taskMsg)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment