Commit 44aa3bda authored by duanjinfei's avatar duanjinfei

update handler task res

parent c42712d0
...@@ -2,23 +2,16 @@ package nm ...@@ -2,23 +2,16 @@ package nm
import ( import (
"bytes" "bytes"
"encoding/json"
"example.com/m/conf" "example.com/m/conf"
"example.com/m/log" "example.com/m/log"
"example.com/m/models"
"example.com/m/operate" "example.com/m/operate"
"fmt"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/golang/groupcache/lru" "github.com/golang/groupcache/lru"
baseV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1" baseV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1" nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"io"
"math/rand"
"net/http" "net/http"
"strconv"
"sync" "sync"
"time"
) )
type TaskHandler struct { type TaskHandler struct {
...@@ -85,98 +78,98 @@ func (t *TaskHandler) SystemTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) ...@@ -85,98 +78,98 @@ func (t *TaskHandler) SystemTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) { func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) {
defer t.Wg.Done() defer t.Wg.Done()
t.TaskRespBody[taskMsg.TaskUuid] = nil t.TaskRespBody[taskMsg.TaskUuid] = []byte{}
t.TaskRespHeader[taskMsg.TaskUuid] = nil t.TaskRespHeader[taskMsg.TaskUuid] = []byte{}
t.TaskExecTime[taskMsg.TaskUuid] = 0 t.TaskExecTime[taskMsg.TaskUuid] = 0
t.TaskIsSuccess[taskMsg.TaskUuid] = false t.TaskIsSuccess[taskMsg.TaskUuid] = true
reader := bytes.NewReader(taskMsg.TaskParam) //reader := bytes.NewReader(taskMsg.TaskParam)
taskCmd := &models.TaskCmd{} //taskCmd := &models.TaskCmd{}
err := json.Unmarshal(bytes.NewBufferString(taskMsg.TaskCmd).Bytes(), taskCmd) //err := json.Unmarshal(bytes.NewBufferString(taskMsg.TaskCmd).Bytes(), taskCmd)
if err != nil { //if err != nil {
log.Errorf("failed to unmarshal task cmd: %s", err.Error()) // log.Errorf("failed to unmarshal task cmd: %s", err.Error())
return // return
} //}
//
images, err := t.DockerOp.PsImages() //images, err := t.DockerOp.PsImages()
if err != nil { //if err != nil {
log.Error("Ps images failed:", err) // log.Error("Ps images failed:", err)
return // return
} //}
imageId := "" //imageId := ""
isFound := false //isFound := false
for _, image := range images { //for _, image := range images {
if isFound { // if isFound {
break // break
} // }
for _, tag := range image.RepoTags { // for _, tag := range image.RepoTags {
if tag == taskCmd.ImageName { // if tag == taskCmd.ImageName {
imageId = image.ID // imageId = image.ID
isFound = true // isFound = true
break // break
} // }
} // }
log.Println(image.ID) // log.Println(image.ID)
} //}
containers := t.DockerOp.ListContainer() //containers := t.DockerOp.ListContainer()
isImageRunExist := false //isImageRunExist := false
for _, container := range containers { //for _, container := range containers {
if container.ImageID == imageId { // if container.ImageID == imageId {
taskCmd.ApiUrl = fmt.Sprintf(taskCmd.ApiUrl, container.Ports[0].PublicPort) // taskCmd.ApiUrl = fmt.Sprintf(taskCmd.ApiUrl, container.Ports[0].PublicPort)
isImageRunExist = true // isImageRunExist = true
break // break
} // }
} //}
if !isImageRunExist { //if !isImageRunExist {
var externalPort int64 // var externalPort int64
for { // for {
// 设置种子以确保每次运行时生成不同的随机数序列 // // 设置种子以确保每次运行时生成不同的随机数序列
rand.Seed(time.Now().UnixNano()) // rand.Seed(time.Now().UnixNano())
// 生成一个介于 0 和 100 之间的随机整数 // // 生成一个介于 0 和 100 之间的随机整数
externalPort = rand.Int63n(10001) + 10000 // externalPort = rand.Int63n(10001) + 10000
log.Info("DockerOp UsedExternalPort :", t.DockerOp.UsedExternalPort[externalPort]) // log.Info("DockerOp UsedExternalPort :", t.DockerOp.UsedExternalPort[externalPort])
if t.DockerOp.UsedExternalPort[externalPort] { // if t.DockerOp.UsedExternalPort[externalPort] {
continue // continue
} // }
break // break
} // }
taskCmd.DockerCmd.HostPort = strconv.FormatInt(externalPort, 10) // taskCmd.DockerCmd.HostPort = strconv.FormatInt(externalPort, 10)
taskCmd.ApiUrl = fmt.Sprintf(taskCmd.ApiUrl, externalPort) // taskCmd.ApiUrl = fmt.Sprintf(taskCmd.ApiUrl, externalPort)
if int64(len(containers)) == conf.GetConfig().ContainerNum { // if int64(len(containers)) == conf.GetConfig().ContainerNum {
//todo: 待定,需要根据权重去停止哪个容器 // //todo: 待定,需要根据权重去停止哪个容器
t.DockerOp.StopAndDeleteContainer(containers[0].ID) // t.DockerOp.StopAndDeleteContainer(containers[0].ID)
} // }
containerId, err := t.DockerOp.CreateAndStartContainer(taskCmd.ImageName, taskCmd.DockerCmd) // containerId, err := t.DockerOp.CreateAndStartContainer(taskCmd.ImageName, taskCmd.DockerCmd)
if err != nil { // if err != nil {
log.Errorf("Create and start container failed: %s", err.Error()) // log.Errorf("Create and start container failed: %s", err.Error())
return // return
} // }
log.Infof("Started container with ID %s", containerId) // log.Infof("Started container with ID %s", containerId)
} //}
//
startBeforeTaskTime := time.Now() //startBeforeTaskTime := time.Now()
post, err := t.HttpClient.Post(taskCmd.ApiUrl, "application/json", reader) //post, err := t.HttpClient.Post(taskCmd.ApiUrl, "application/json", reader)
if err != nil { //if err != nil {
log.Error("Http client post error: ", err) // log.Error("Http client post error: ", err)
return // return
} //}
endAfterTaskTime := time.Since(startBeforeTaskTime) //endAfterTaskTime := time.Since(startBeforeTaskTime)
log.WithField("time", endAfterTaskTime.Seconds()).Info("Exec task end (second is units) :") //log.WithField("time", endAfterTaskTime.Seconds()).Info("Exec task end (second is units) :")
if post.StatusCode == http.StatusOK { //if post.StatusCode == http.StatusOK {
headers, err := json.Marshal(post.Header) // headers, err := json.Marshal(post.Header)
if err != nil { // if err != nil {
log.Error("JSON marshal header error: ", err) // log.Error("JSON marshal header error: ", err)
return // return
} // }
readBody, err := io.ReadAll(post.Body) // readBody, err := io.ReadAll(post.Body)
if err != nil { // if err != nil {
log.Error("received error: ", err) // log.Error("received error: ", err)
return // return
} // }
t.TaskRespHeader[taskMsg.TaskUuid] = headers // t.TaskRespHeader[taskMsg.TaskUuid] = headers
t.TaskRespBody[taskMsg.TaskUuid] = readBody // t.TaskRespBody[taskMsg.TaskUuid] = readBody
t.TaskIsSuccess[taskMsg.TaskUuid] = true // t.TaskIsSuccess[taskMsg.TaskUuid] = true
t.TaskExecTime[taskMsg.TaskUuid] = endAfterTaskTime.Microseconds() // t.TaskExecTime[taskMsg.TaskUuid] = endAfterTaskTime.Microseconds()
} //}
log.Info("received computeTask--------------------------------") log.Info("received computeTask--------------------------------")
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment