Commit 42ba8102 authored by duanjinfei's avatar duanjinfei

Merge branch 'master' into test

parents 37c62ece aa3f3228
...@@ -9,25 +9,26 @@ import ( ...@@ -9,25 +9,26 @@ import (
) )
type Config struct { type Config struct {
SignPrv string SignPrv string
SignPub string SignPub string
DockerServer string DockerServer string
BenefitAddress string BenefitAddress string
HeartRespTimeMillis int64 HeartRespTimeMillis int64
ExternalIp string ExternalIp string
SignPublicAddress common.Address SignPublicAddress common.Address
SignPrivateKey *ecdsa.PrivateKey SignPrivateKey *ecdsa.PrivateKey
NmSeed string `json:"nm_seed"` NmSeed string `json:"nm_seed"`
HeartRespTimeSecond int64 `json:"heart_response"` HeartRespTimeSecond int64 `json:"heart_response"`
TaskValidatorTime float64 `json:"task_validator_time"` TaskValidatorTime float64 `json:"task_validator_time"`
ContainerNum int64 `json:"container_num"` ContainerNum int64 `json:"container_num"`
NodeManagerNum int64 `json:"node_manager_num"` NodeManagerNum int64 `json:"node_manager_num"`
ChainID int64 `json:"chain_id"` ChainID int64 `json:"chain_id"`
ApiUrl string `json:"api_url"` ApiUrl string `json:"api_url"`
ValidatorUrl string `json:"validator_url"` ValidatorUrl string `json:"validator_url"`
OssUrl string `json:"oss_url"` OssUrl string `json:"oss_url"`
WaitLastTaskExecTime int64 `json:"wait_last_task_exec_time"` WaitLastTaskExecTime int64 `json:"wait_last_task_exec_time"`
OpSys string `json:"op_sys"` OpSys string `json:"op_sys"`
ReplicateImageNameSuffix string `json:"replicate_image_name_suffix"`
} }
var _cfg *Config = nil var _cfg *Config = nil
......
...@@ -9,5 +9,6 @@ ...@@ -9,5 +9,6 @@
"validator_url": "18.167.203.17:20011", "validator_url": "18.167.203.17:20011",
"oss_url": "https://tmp-file.aigic.ai/api/v1/upload", "oss_url": "https://tmp-file.aigic.ai/api/v1/upload",
"wait_last_task_exec_time": 60, "wait_last_task_exec_time": 60,
"op_sys": "linux" "op_sys": "linux",
"replicate_image_name_suffix": "docker.aigic.ai/ai"
} }
\ No newline at end of file
package models package models
const ( const (
TaskType = "taskType" TaskType = "taskType"
ContainerSign = "container" ContainerSign = "container"
MinerSign = "miner" MinerSign = "miner"
ReqHash = "reqHash" ReqHash = "reqHash"
RespHash = "respHash" RespHash = "respHash"
ResultFileExpiresDB = "expires" ResultFileExpiresDB = "expires"
ContentType = "type" ContentType = "type"
RedirectCode = 303 RedirectCode = 303
UseFileCache = "Use-File-Cache" UseFileCache = "Use-File-Cache"
UseRedirect = "Use-Redirect" UseRedirect = "Use-Redirect"
Prefer = "Prefer" Prefer = "Prefer"
Async = "respond-async" Async = "respond-async"
MaxExecTime = "MaxExecTime" MaxExecTime = "MaxExecTime"
HealthCheckAPI = "/health-check" HealthCheckAPI = "/health-check"
ReplicateImageNameSuffix = "docker.agicoin.ai/agicoin" READY = "READY"
READY = "READY" ZeroHost = "0.0.0.0"
ZeroHost = "0.0.0.0" ModelPublishStatusYes = 1
ModelPublishStatusYes = 1 ModelPublishStatusNo = 2
ModelPublishStatusNo = 2 DefaultMaxExecTime = 300
DefaultMaxExecTime = 300 DefaultTaskTimer = 2
DefaultTaskTimer = 2
) )
...@@ -197,25 +197,6 @@ func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) ...@@ -197,25 +197,6 @@ func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
log.Info("Container ports:", internalPort) log.Info("Container ports:", internalPort)
log.WithField("ApiUrl", taskOp.taskCmd.ApiUrl).Info("The image is running") log.WithField("ApiUrl", taskOp.taskCmd.ApiUrl).Info("The image is running")
} }
reqContainerBody := bytes.NewReader(taskOp.taskParam.Body)
if len(taskOp.taskParam.Queries) > 0 {
queryString := utils.MatchContainerQueryString(taskOp.taskParam.Queries)
taskOp.taskCmd.ApiUrl = fmt.Sprintf("%s?%s", taskOp.taskCmd.ApiUrl, queryString)
log.WithField("ApiUrl", taskOp.taskCmd.ApiUrl).Info("The task param query str not empty")
}
taskOp.request, err = http.NewRequest("POST", taskOp.taskCmd.ApiUrl, reqContainerBody)
if err != nil {
log.WithField("error:", err).Error("New container request failed")
taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Http client new container request failed", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
taskOp.request.Header.Set("Content-Type", "application/json")
if err = taskOp.validateWebHook(); err != nil {
taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
return
}
if err = taskOp.waitReqContainerOk(t.DockerOp); err != nil { if err = taskOp.waitReqContainerOk(t.DockerOp); err != nil {
taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s", err.Error()) taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true) t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
...@@ -476,7 +457,7 @@ func (op *TaskOp) waitContainerRunning(handler *TaskWorker, imageId string) erro ...@@ -476,7 +457,7 @@ func (op *TaskOp) waitContainerRunning(handler *TaskWorker, imageId string) erro
if !running { if !running {
continue continue
} }
if isMatch := strings.HasPrefix(op.taskCmd.ImageName, models.ReplicateImageNameSuffix); isMatch { if isMatch := strings.HasPrefix(op.taskCmd.ImageName, conf.GetConfig().ReplicateImageNameSuffix); isMatch {
if isReqSuccess, err := op.checkContainerHealthy(internalIp, internalPort); err != nil { if isReqSuccess, err := op.checkContainerHealthy(internalIp, internalPort); err != nil {
log.WithField("err", err).Errorf("check container healthy failed") log.WithField("err", err).Errorf("check container healthy failed")
return fmt.Errorf("%s-%s", "check container healthy failed", err.Error()) return fmt.Errorf("%s-%s", "check container healthy failed", err.Error())
...@@ -537,12 +518,25 @@ func (op *TaskOp) waitReqContainerOk(dockerOp *operate.DockerOp) error { ...@@ -537,12 +518,25 @@ func (op *TaskOp) waitReqContainerOk(dockerOp *operate.DockerOp) error {
log.Errorf("%s", "The maximum execution time for this task has been exceeded") log.Errorf("%s", "The maximum execution time for this task has been exceeded")
return fmt.Errorf("%s", "The maximum execution time for this task has been exceeded") return fmt.Errorf("%s", "The maximum execution time for this task has been exceeded")
} }
newQuest := utils.CloneRequest(op.request) reqContainerBody := bytes.NewReader(op.taskParam.Body)
post, err := op.httpClient.Do(newQuest) if len(op.taskParam.Queries) > 0 {
queryString := utils.MatchContainerQueryString(op.taskParam.Queries)
op.taskCmd.ApiUrl = fmt.Sprintf("%s?%s", op.taskCmd.ApiUrl, queryString)
log.WithField("ApiUrl", op.taskCmd.ApiUrl).Info("The task param query str not empty")
}
op.request, err = http.NewRequest("POST", op.taskCmd.ApiUrl, reqContainerBody)
if err != nil {
log.WithField("error:", err).Error("New container request failed")
return fmt.Errorf("%s,%s", "Http client new container request failed", err.Error())
}
op.request.Header.Set("Content-Type", "application/json")
if err = op.validateWebHook(); err != nil {
return fmt.Errorf("%s", err.Error())
}
post, err := op.httpClient.Do(op.request)
if err != nil { if err != nil {
log.WithField("error:", err).Error("Http client post request container failed") log.WithField("error:", err).Error("Http client post request container failed")
return fmt.Errorf("%s,%s", "Http client post request container failed", err.Error()) return fmt.Errorf("%s,%s", "Http client post request container failed", err.Error())
//continue
} }
log.WithField("StatusCode", post.StatusCode).WithField("taskId", op.taskMsg.TaskId).Info("Exec task result") log.WithField("StatusCode", post.StatusCode).WithField("taskId", op.taskMsg.TaskId).Info("Exec task result")
if post.StatusCode == http.StatusOK { if post.StatusCode == http.StatusOK {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment