Commit 4b9cdbf1 authored by duanjinfei's avatar duanjinfei

update model res upload file cache

parent 0df6dac6
......@@ -25,6 +25,7 @@ type Config struct {
ChainID int64 `json:"chain_id"`
ApiUrl string `json:"api_url"`
ValidatorUrl string `json:"validator_url"`
OssUrl string `json:"oss_url"`
}
var _cfg *Config = nil
......
......@@ -6,5 +6,6 @@
"task_validator_time": 1,
"container_num": 1,
"chain_id": 100,
"validator_url": "43.198.252.255:20011"
"validator_url": "43.198.252.255:20011",
"oss_url": "http://43.198.252.255:13000/api/v1/upload"
}
\ No newline at end of file
package models
const (
AiPaint = "aipaint"
Chat = "chat"
Picture = "picture"
Language = "language"
TaskType = "taskType"
ContainerSign = "container"
MinerSign = "miner"
ReqHash = "reqHash"
RespHash = "respHash"
TaskType = "taskType"
ContainerSign = "container"
MinerSign = "miner"
ReqHash = "reqHash"
RespHash = "respHash"
ResultFileExpiresDB = "ResultFileExpiresDB"
RedirectCode = 307
UseFileCache = "USE-FILE-CACHE"
)
......@@ -3,6 +3,7 @@ package models
import (
nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"sync"
"time"
)
type TaskCmd struct {
......@@ -23,16 +24,48 @@ type TaskReq struct {
TaskResult []byte `json:"task_result"`
}
type TaskParam struct {
Headers map[string][]string `json:"headers"`
Queries map[string]string `json:"queries"`
Body []byte `json:"body"`
}
type TaskResult struct {
TaskHttpHeaders []byte
TaskExecTime int64
TaskHttpStatusCode int32
TaskRespBody []byte
TaskIsSuccess bool
}
type ModelResponse struct {
Output []string `json:"output"`
}
type HardwareRequire struct {
DiskSize string `json:"disk_size"`
Gpus []*GpuInfo `json:"gpus"`
MemorySize string `json:"memory_size"`
}
type GpuInfo struct {
Gpu string `json:"gpu"`
}
type ModelInfo struct {
TaskId uint64 `json:"task_id"`
User string `json:"user"`
Pwd string `json:"pwd"`
Repository string `json:"repository"`
SignUrl string `json:"sign_url"`
ImageName string `json:"image_name"`
DiskSize int64 `json:"disk_size"`
MemorySize int64 `json:"memory_size"`
IsImageExist bool
Time time.Time `json:"time"`
Count int `json:"count"`
HardwareRequire *HardwareRequire `json:"hardware_require"`
Kind int `json:"kind"`
TaskId uint64 `json:"task_id"`
User string `json:"user"`
Pwd string `json:"pwd"`
SignUrl string `json:"sign_url"`
ImageName string `json:"image_name"`
DiskSize int64 `json:"disk_size"`
MemorySize int64 `json:"memory_size"`
OutPutJson string `json:"out_put_json"`
FileExpiresTime string `json:"file_expires_time"`
}
type ComputeResult struct {
......
......@@ -65,7 +65,6 @@ func monitorModelInfo(dockerOp *operate.DockerOp) {
// todo: 如果够用
if isPull {
go dockerOp.PullImage(modelInfo)
modelInfo.IsImageExist = true
// todo: 是否立马上报数据
// reportTaskIds = append(reportTaskIds, modelInfo.TaskId)
}
......
......@@ -8,6 +8,7 @@ import (
"github.com/docker/docker/libnetwork/bitmap"
nodemanagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"github.com/shirou/gopsutil/cpu"
"strconv"
)
type WorkerMsgHandler func(params ...interface{}) *nodemanagerV1.WorkerMessage
......@@ -140,20 +141,15 @@ func DeviceInfoResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
if err != nil {
log.Error("Error getting CPU info: ", err)
}
if cpuInfos != nil {
for _, info := range cpuInfos {
log.WithField("core", info.Cores).WithField("cpu", info.ModelName).Info("cpu info")
for i, cpuInfo := range cpuInfos {
cpuInfo := &nodemanagerV1.DeviceInfo{
DeviceType: fmt.Sprintf("cpu-%d", i),
DeviceModel: cpuInfo.ModelName,
DevicePower: 12,
DeviceParam: strconv.FormatFloat(cpuInfo.Mhz, 'f', 2, 64),
}
devices = append(devices, cpuInfo)
}
//for i, cpuInfo := range cpuInfos {
// cpuInfo := &nodemanagerV1.DeviceInfo{
// DeviceType: fmt.Sprintf("cpu-%d", i),
// DeviceModel: cpuInfo.ModelName,
// DevicePower: 12,
// DeviceParam: strconv.FormatFloat(cpuInfo.Mhz, 'f', 2, 64),
// }
// devices = append(devices, cpuInfo)
//}
cpuInfo := &nodemanagerV1.DeviceInfo{
DeviceType: "cpu-0",
......@@ -236,19 +232,18 @@ func SubmitResultResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
taskId := params[0].(string)
containerSign := params[1].([]byte)
minerSign := params[2].([]byte)
taskResultHeader := params[3].([]byte)
taskResultBody := params[4].([]byte)
taskResExecTime := params[5].(int64)
isSuccess := params[6].(bool)
taskExecResult := params[3].(*models.TaskResult)
isSuccess := params[4].(bool)
submitResultMsgRes := &nodemanagerV1.WorkerMessage{
Message: &nodemanagerV1.WorkerMessage_SubmitTaskResult{
SubmitTaskResult: &nodemanagerV1.SubmitTaskResult{
TaskId: taskId,
ContainerSignature: containerSign,
MinerSignature: minerSign,
TaskResultHeader: taskResultHeader,
TaskResultBody: taskResultBody,
TaskExecuteDuration: uint64(taskResExecTime),
TaskResultCode: taskExecResult.TaskHttpStatusCode,
TaskResultHeader: taskExecResult.TaskHttpHeaders,
TaskResultBody: taskExecResult.TaskRespBody,
TaskExecuteDuration: uint64(taskExecResult.TaskExecTime),
IsSuccessed: isSuccess,
},
},
......
......@@ -285,17 +285,16 @@ func handlerMsg(nodeManager *models.NodeManagerClient,
taskMsgWorker.Wg.Add(1)
taskMsgWorker.TaskMsg <- taskMsg
taskMsgWorker.Wg.Wait()
taskResHeader := taskMsgWorker.TaskRespHeader[taskMsg.TaskId]
taskResBody := taskMsgWorker.TaskRespBody[taskMsg.TaskId]
taskResExecTime := taskMsgWorker.TaskExecTime[taskMsg.TaskId]
isSuccess := taskMsgWorker.TaskIsSuccess[taskMsg.TaskId]
containerSign := taskMsgWorker.DockerOp.GetContainerSign(taskMsg, taskResBody)
taskExecResInterface, _ := taskMsgWorker.LruCache.Get(taskMsg.TaskId)
taskExecRes := taskExecResInterface.(*models.TaskResult)
isSuccess := taskExecRes.TaskIsSuccess
containerSign := taskMsgWorker.DockerOp.GetContainerSign(taskMsg, taskExecRes.TaskRespBody)
if containerSign == nil || len(containerSign) == 0 {
log.Error("Container signing failed................")
isSuccess = false
}
reqHash, respHash, minerSign := taskMsgWorker.GetMinerSign(taskMsg, taskResBody)
params := buildParams(taskMsg.TaskId, containerSign, minerSign, taskResHeader, taskResBody, taskResExecTime, isSuccess)
reqHash, respHash, minerSign := taskMsgWorker.GetMinerSign(taskMsg, taskExecRes.TaskRespBody)
params := buildParams(taskMsg.TaskId, containerSign, minerSign, taskExecRes, isSuccess)
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.TaskType, taskMsg.TaskType)
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.ContainerSign, containerSign)
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.MinerSign, minerSign)
......
......@@ -2,11 +2,13 @@ package nm
import (
"bytes"
"encoding/base64"
"encoding/json"
"example.com/m/conf"
"example.com/m/log"
"example.com/m/models"
"example.com/m/operate"
"example.com/m/utils"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
......@@ -15,8 +17,10 @@ import (
nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"io"
"math/rand"
"mime/multipart"
"net/http"
"strconv"
"strings"
"sync"
"time"
)
......@@ -27,10 +31,6 @@ type TaskHandler struct {
DockerOp *operate.DockerOp
CmdOp *operate.Command
TaskMsg chan *nodeManagerV1.PushTaskMessage
TaskRespHeader map[string][]byte
TaskExecTime map[string]int64
TaskRespBody map[string][]byte
TaskIsSuccess map[string]bool
HttpClient *http.Client
IsExecAiTask bool
IsExecStandardTask bool
......@@ -40,16 +40,12 @@ var oldTaskImageName string
func NewTaskWorker(op *operate.DockerOp) *TaskHandler {
return &TaskHandler{
Wg: &sync.WaitGroup{},
LruCache: lru.New(100),
DockerOp: op,
TaskMsg: make(chan *nodeManagerV1.PushTaskMessage, 0),
TaskExecTime: make(map[string]int64, 0),
TaskRespHeader: make(map[string][]byte, 0),
TaskRespBody: make(map[string][]byte, 0),
TaskIsSuccess: make(map[string]bool, 0),
HttpClient: &http.Client{},
IsExecAiTask: false,
Wg: &sync.WaitGroup{},
LruCache: lru.New(100),
DockerOp: op,
TaskMsg: make(chan *nodeManagerV1.PushTaskMessage, 0),
HttpClient: &http.Client{},
IsExecAiTask: false,
}
}
......@@ -94,10 +90,13 @@ func (t *TaskHandler) SystemTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) {
defer t.Wg.Done()
t.TaskRespBody[taskMsg.TaskId] = nil
t.TaskRespHeader[taskMsg.TaskId] = nil
t.TaskExecTime[taskMsg.TaskId] = 0
t.TaskIsSuccess[taskMsg.TaskId] = false
taskExecResult := &models.TaskResult{
TaskHttpStatusCode: 200,
TaskRespBody: nil,
TaskHttpHeaders: nil,
TaskIsSuccess: false,
TaskExecTime: 0,
}
taskCmd := &models.TaskCmd{}
err := json.Unmarshal(bytes.NewBufferString(taskMsg.TaskCmd).Bytes(), taskCmd)
if err != nil {
......@@ -181,7 +180,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
return
}
log.Infof("Started container with ID %s", containerId)
time.Sleep(time.Second * 10)
time.Sleep(time.Second * 20)
running, internalIp, internalPort = t.foundImageIsRunning(imageId)
if running {
taskCmd.ApiUrl = fmt.Sprintf("http://%s:%d%s", internalIp, internalPort, taskCmd.ApiUrl)
......@@ -194,7 +193,17 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
log.WithField("ApiUrl", taskCmd.ApiUrl).Info("The image is running")
}
startBeforeTaskTime := time.Now()
reqContainerBody := bytes.NewReader(taskMsg.TaskParam)
taskParam := &models.TaskParam{}
err = json.Unmarshal(taskMsg.TaskParam, taskParam)
if err != nil {
log.WithField("err", err).Error("Error unmarshalling task parameter")
return
}
reqContainerBody := bytes.NewReader(taskParam.Body)
if len(taskParam.Queries) > 0 {
queryString := utils.MatchContainerQueryString(taskParam.Queries)
taskCmd.ApiUrl = fmt.Sprintf("%s?%s", taskCmd.ApiUrl, queryString)
}
post, err := t.HttpClient.Post(taskCmd.ApiUrl, "application/json", reqContainerBody)
if err != nil {
log.WithField("error:", err).Error("Http client post request container failed")
......@@ -204,21 +213,66 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
log.WithField("time", endAfterTaskTime.Seconds()).WithField("taskId", taskMsg.TaskId).Info("Exec task end (second is units) :")
log.WithField("StatusCode", post.StatusCode).WithField("taskId", taskMsg.TaskId).Info("Exec task result")
if post.StatusCode == http.StatusOK {
headers, err := json.Marshal(post.Header)
taskExecResult.TaskHttpStatusCode = http.StatusOK
readBody, err := io.ReadAll(post.Body)
if err != nil {
log.Error("JSON marshal header error: ", err)
log.Error("received error: ", err)
return
}
readBody, err := io.ReadAll(post.Body)
isUseFileCache := true
for key, value := range taskParam.Headers {
if key == models.UseFileCache {
if value[0] == "0" {
isUseFileCache = false
break
}
}
}
if isUseFileCache && readBody != nil {
containerResp := &models.ModelResponse{}
err = json.Unmarshal(readBody, &containerResp)
if err != nil {
log.WithError(err).Error("Error unmarshalling oss resp body failed")
return
}
if len(containerResp.Output) == 1 {
if utils.IsBase64ImageStr(containerResp.Output[0]) {
imageStr := strings.SplitN(containerResp.Output[0], ",", 2)[1]
queryString := utils.MatchFileCacheQueryString(taskParam.Headers, taskCmd.ImageName, t.DockerOp.ModelsInfo)
ossUri, err := t.uploadOSS(taskMsg.TaskId, queryString, imageStr)
if err != nil {
log.WithError(err).Error("upload image into file cache failed")
return
}
log.WithField("uri", ossUri).Info("upload image OSS successful")
if ossUri != "" {
taskExecResult.TaskHttpStatusCode = models.RedirectCode
post.Header.Set("Location", ossUri)
}
}
}
}
headers, err := json.Marshal(post.Header)
if err != nil {
log.Error("received error: ", err)
log.Error("JSON marshal header error: ", err)
return
}
t.TaskRespHeader[taskMsg.TaskId] = headers
t.TaskRespBody[taskMsg.TaskId] = readBody
t.TaskIsSuccess[taskMsg.TaskId] = true
t.TaskExecTime[taskMsg.TaskId] = endAfterTaskTime.Microseconds()
taskExecResult.TaskHttpHeaders = headers
taskExecResult.TaskRespBody = readBody
taskExecResult.TaskIsSuccess = true
taskExecResult.TaskExecTime = endAfterTaskTime.Microseconds()
} else {
taskExecResult.TaskHttpStatusCode = int32(post.StatusCode)
if post.Body != nil {
all, err := io.ReadAll(post.Body)
if err != nil {
log.Error("JSON read error: ", err)
return
}
taskExecResult.TaskRespBody = all
} else {
taskExecResult.TaskRespBody = nil
}
log.WithField("error", post.Body).WithField("taskId", taskMsg.TaskId).Error("Exec task result is failed")
}
if taskMsg.TaskKind == baseV1.TaskKind_ComputeTask {
......@@ -226,6 +280,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
} else if taskMsg.TaskKind == baseV1.TaskKind_StandardTask {
t.IsExecStandardTask = false
}
t.LruCache.Add(taskMsg.TaskId, taskExecResult)
log.Info("received computeTask--------------------------------")
}
......@@ -266,3 +321,50 @@ func (t *TaskHandler) foundImageIsRunning(imageId string) (bool, string, uint16)
}
return false, "", 0
}
func (t *TaskHandler) uploadOSS(taskId string, queries string, base64Image string) (string, error) {
// todo: 解析结果
// TODO: 存储OSS
var requestBody bytes.Buffer
writer := multipart.NewWriter(&requestBody)
// 创建文件表单字段
fileField, err := writer.CreateFormFile("file", fmt.Sprintf("%s.png", taskId))
if err != nil {
log.WithError(err).Error("Error creating form file")
return "", err
}
// 将 base64 解码后的内容复制到表单字段
decodedImage, err := base64.StdEncoding.DecodeString(base64Image)
if err != nil {
log.WithError(err).Error("Error decoding base64 image")
return "", err
}
_, err = io.Copy(fileField, bytes.NewReader(decodedImage))
if err != nil {
log.WithError(err).Error("Error copying file contents")
return "", err
}
// 关闭 multipart writer
err = writer.Close()
if err != nil {
log.WithError(err).Error("Error closing writer")
return "", err
}
ossUrl := fmt.Sprintf("%s?%s", conf.GetConfig().OssUrl, queries)
request, err := http.NewRequest("POST", ossUrl, &requestBody)
if err != nil {
return "", err
}
request.Header.Set("Content-Type", writer.FormDataContentType())
response, err := t.HttpClient.Do(request)
if err != nil {
log.WithError(err).Error("Error request oss failed")
return "", err
}
ossRespBody, err := io.ReadAll(response.Body)
if err != nil {
log.WithError(err).Error("Error read oss resp body failed")
return "", err
}
return bytes.NewBuffer(ossRespBody).String(), nil
}
......@@ -55,9 +55,15 @@ func NewDockerOp() *DockerOp {
}
func (d *DockerOp) GetContainerSign(taskMsg *nodemanagerv1.PushTaskMessage, taskRes []byte) []byte {
taskParam := &models.TaskParam{}
err := json.Unmarshal(taskMsg.TaskParam, taskParam)
if err != nil {
log.WithField("err", err).Error("Error unmarshalling task parameter")
return nil
}
reqBody := &models.TaskReq{
TaskId: taskMsg.TaskId,
TaskParam: taskMsg.TaskParam,
TaskParam: taskParam.Body,
TaskResult: taskRes,
}
body, err := json.Marshal(reqBody)
......@@ -286,14 +292,17 @@ func (d *DockerOp) PullImage(info *models.ModelInfo) {
log.Errorf("Error pulling image from %s: %v", info.ImageName, err)
return
}
defer response.Close()
defer func(response io.ReadCloser) {
err := response.Close()
if err != nil {
log.WithError(err).Error("Close image pull response failed")
}
}(response)
// 读取拉取镜像的输出
if _, err = io.ReadAll(response); err != nil {
log.Error(err)
log.WithError(err).Error("Read image pull response failed")
return
}
log.Info("Image pulled successfully.")
}
......
......@@ -4,13 +4,16 @@ import (
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"encoding/base64"
"example.com/m/log"
"example.com/m/models"
"fmt"
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/ethereum/go-ethereum/crypto"
"io/ioutil"
"math/big"
"net/url"
"os"
"strings"
)
const KeypadPwd = "keystore"
......@@ -71,13 +74,13 @@ func savePrivateKey(privateKey *ecdsa.PrivateKey) error {
}
func readPrivateKey() (*ecdsa.PrivateKey, error) {
file, err := ioutil.ReadDir(KeypadPwd)
file, err := os.ReadDir(KeypadPwd)
if err != nil {
return nil, err
}
for _, info := range file {
keystoreFile := fmt.Sprintf("%s%s%s", KeypadPwd, "/", info.Name())
jsonBytes, err := ioutil.ReadFile(keystoreFile)
jsonBytes, err := os.ReadFile(keystoreFile)
if err != nil {
log.Error("import ecdsa keystore error: ", err)
continue
......@@ -91,3 +94,45 @@ func readPrivateKey() (*ecdsa.PrivateKey, error) {
}
return nil, nil
}
// IsBase64ImageStr 检查字符串是否是 Base64 编码的图像数据
func IsBase64ImageStr(imageStr string) bool {
// 移除可能的前缀(如 "data:image/png;base64,")
imageStr = strings.SplitN(imageStr, ",", 2)[1]
_, err := base64.StdEncoding.DecodeString(imageStr)
return err == nil
}
func MatchFileCacheQueryString(params map[string][]string, taskImageName string, modelsInfo []*models.ModelInfo) string {
values := url.Values{}
isExistFileExpires := false
for key, value := range params {
if key == models.ResultFileExpiresDB {
values.Add(key, value[0])
isExistFileExpires = true
break
}
}
isModelExistFileExpires := false
if !isExistFileExpires {
for _, info := range modelsInfo {
if info.ImageName == taskImageName && info.FileExpiresTime != "" {
values.Add(models.ResultFileExpiresDB, info.FileExpiresTime)
isModelExistFileExpires = true
break
}
}
}
if !isModelExistFileExpires {
values.Add(models.ResultFileExpiresDB, "600")
}
return values.Encode()
}
func MatchContainerQueryString(params map[string]string) string {
values := url.Values{}
for key, value := range params {
values.Add(key, value)
}
return values.Encode()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment