Commit 98c3e693 authored by duanjinfei's avatar duanjinfei

merge master

parents 356c2d3b 8a5d323b
......@@ -9,23 +9,24 @@ import (
)
type Config struct {
SignPrv string
SignPub string
DockerServer string
BenefitAddress string
HeartRespTimeMillis int64
ExternalIp string
SignPublicAddress common.Address
SignPrivateKey *ecdsa.PrivateKey
NmSeed string `json:"nm_seed"`
HeartRespTimeSecond int64 `json:"heart_response"`
TaskValidatorTime float64 `json:"task_validator_time"`
ContainerNum int64 `json:"container_num"`
NodeManagerNum int64 `json:"node_manager_num"`
ChainID int64 `json:"chain_id"`
ApiUrl string `json:"api_url"`
ValidatorUrl string `json:"validator_url"`
OssUrl string `json:"oss_url"`
SignPrv string
SignPub string
DockerServer string
BenefitAddress string
HeartRespTimeMillis int64
ExternalIp string
SignPublicAddress common.Address
SignPrivateKey *ecdsa.PrivateKey
NmSeed string `json:"nm_seed"`
HeartRespTimeSecond int64 `json:"heart_response"`
TaskValidatorTime float64 `json:"task_validator_time"`
ContainerNum int64 `json:"container_num"`
NodeManagerNum int64 `json:"node_manager_num"`
ChainID int64 `json:"chain_id"`
ApiUrl string `json:"api_url"`
ValidatorUrl string `json:"validator_url"`
OssUrl string `json:"oss_url"`
WaitLastTaskExecTime int64 `json:"wait_last_task_exec_time"`
}
var _cfg *Config = nil
......
......@@ -7,5 +7,6 @@
"container_num": 1,
"chain_id": 100,
"validator_url": "18.167.203.17:20011",
"oss_url": "https://tmp-file.agicoin.ai/api/v1/upload"
"oss_url": "https://tmp-file.agicoin.ai/api/v1/upload",
"wait_last_task_exec_time": 10
}
\ No newline at end of file
......@@ -15,7 +15,7 @@ import (
func monitorModelInfo(dockerOp *operate.DockerOp) {
client := &http.Client{}
ticker := time.NewTicker(time.Second * 1)
ticker := time.NewTicker(time.Second * 10)
for {
select {
case <-ticker.C:
......
......@@ -182,6 +182,7 @@ func monitorWorker(op *operate.DockerOp) {
msgRespWorker.RegisterMsgResp(nodeManager, worker, RegisterInfoResp, nil)
time.Sleep(time.Second * 2)
msgRespWorker.RegisterMsgResp(nodeManager, worker, DeviceInfoResp, nil)
op.ModelTaskIdIndexesChan <- op.ReportTaskIds
isSend = true
log.Info("------------------------Send once-off message ended------------------------")
}
......
......@@ -374,6 +374,13 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
apiResBody := utils.EncodeJsonEscapeHTML(apiRes)
taskExecResult.TaskRespBody = apiResBody
}
} else {
taskExecResult.TaskExecError = fmt.Sprintf("worker:%s,%s", conf.GetConfig().SignPublicAddress.Hex(), "Container resp ouput is nil")
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true)
apiRes := make([]string, 0)
apiResBody := utils.EncodeJsonEscapeHTML(apiRes)
taskExecResult.TaskRespBody = apiResBody
return
}
}
}
......@@ -577,7 +584,14 @@ func (t *TaskHandler) checkLastTaskExecStatus(taskMsg *nodeManagerV1.PushTaskMes
t.IsExecStandardTask = true
}
if oldTaskId != taskMsg.TaskId {
now := time.Now()
for {
since := time.Since(now)
if int64(since.Seconds()) > conf.GetConfig().WaitLastTaskExecTime {
log.WithField("taskId", oldTaskId).Info("Waiting for last task execution ending")
oldTaskId = taskMsg.TaskId
break
}
if oldTaskId == "" {
oldTaskId = taskMsg.TaskId
break
......@@ -652,6 +666,10 @@ func parseData(readBody []byte) interface{} {
log.WithError(err).Error("Parse json raw message failed")
return bytes.NewBuffer(readBody).String()
}
if m["output"] == nil {
log.WithField("output", nil).Warn("The container resp")
return nil
}
var outputTwoArray [][]string
if err := json.Unmarshal(m["output"], &outputTwoArray); err != nil {
log.WithField("err", err).Warn("parse two array output filed failed:")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment