Commit 44c177e8 authored by duanjinfei's avatar duanjinfei

update message resp

parent a097d561
.idea .idea
logs logs
*.DS_Store *.DS_Store
mydb data
keystore keystore
powerNode powerNode
\ No newline at end of file
package main package main
import ( import (
"encoding/json"
"example.com/m/conf" "example.com/m/conf"
"example.com/m/log" "example.com/m/log"
"example.com/m/nm" "example.com/m/nm"
"example.com/m/utils"
"fmt" "fmt"
"github.com/astaxie/beego" "github.com/astaxie/beego"
"github.com/ethereum/go-ethereum/common" "github.com/fsnotify/fsnotify"
"github.com/ethereum/go-ethereum/crypto"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
"io/ioutil"
"os" "os"
) )
...@@ -75,43 +71,32 @@ func initConfig() { ...@@ -75,43 +71,32 @@ func initConfig() {
viper.AutomaticEnv() viper.AutomaticEnv()
viper.WatchConfig()
viper.OnConfigChange(func(e fsnotify.Event) {
// 配置文件发生变更之后会调用的回调函数
log.Warn("The configuration file has been modified...........")
err := viper.Unmarshal(conf.GetConfig())
if err != nil {
log.WithError(err).Error("Viper unmarshal cfg error:")
panic(fmt.Errorf("Viper unmarshal conf failed, err:%s \n", err))
}
conf.GetConfig().UpdateFiledInfo()
log.Info("Config file changed success:", e.Name)
})
// 读取配置文件 // 读取配置文件
if err := viper.ReadInConfig(); err != nil { if err := viper.ReadInConfig(); err != nil {
fmt.Println("Error reading config file:", err) panic(fmt.Errorf("Error reading config file: %s ", err.Error()))
return
}
configFilePath := viper.ConfigFileUsed()
if configFilePath == "" {
// handle error
log.Error("config file path is empty")
panic("config file path is empty")
} }
data, err := ioutil.ReadFile(configFilePath) err := viper.Unmarshal(conf.GetConfig())
if err != nil { if err != nil {
// handle error // handle error
log.Error("Read cfg file error:", err) log.Error("Viper unmarshal cfg error:", err)
panic("Read cfg file error") panic("Viper unmarshal cfg error")
}
err = json.Unmarshal(data, conf.GetConfig())
if err != nil {
// handle error
log.Error("Json unmarshal cfg error:", err)
panic("Json unmarshal cfg error")
}
conf.GetConfig().HeartRespTimeMillis = conf.GetConfig().HeartRespTimeSecond * 60 * 60 * 1000
prvKey, err := utils.GetPrv()
if err != nil {
panic("get prv error or delete keystore after restart")
} }
conf.GetConfig().SignPrivateKey = prvKey conf.GetConfig().UpdateFiledInfo()
ecdsaPub := prvKey.PublicKey
conf.GetConfig().SignPub = common.Bytes2Hex(crypto.FromECDSAPub(&ecdsaPub))
log.Info("PublicKey", conf.GetConfig().SignPub)
publicAddr := crypto.PubkeyToAddress(ecdsaPub)
log.Info("publicAddr:", publicAddr)
conf.GetConfig().SignPublicAddress = publicAddr
} }
func Execute() { func Execute() {
......
...@@ -3,8 +3,10 @@ package conf ...@@ -3,8 +3,10 @@ package conf
import ( import (
"crypto/ecdsa" "crypto/ecdsa"
"example.com/m/log" "example.com/m/log"
"example.com/m/utils"
"fmt" "fmt"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"net/url" "net/url"
) )
...@@ -17,19 +19,17 @@ type Config struct { ...@@ -17,19 +19,17 @@ type Config struct {
ExternalIp string ExternalIp string
SignPublicAddress common.Address SignPublicAddress common.Address
SignPrivateKey *ecdsa.PrivateKey SignPrivateKey *ecdsa.PrivateKey
NmSeed string `json:"nm_seed"` NmSeed string `json:"nm_seed" mapstructure:"nm_seed"`
HeartRespTimeSecond int64 `json:"heart_response"` HeartRespTimeSecond int64 `json:"heart_response" mapstructure:"heart_response"`
TaskValidatorTime float64 `json:"task_validator_time"` NodeManagerNum int64 `json:"node_manager_num" mapstructure:"node_manager_num"`
ContainerNum int64 `json:"container_num"` ChainID int64 `json:"chain_id" mapstructure:"chain_id"`
NodeManagerNum int64 `json:"node_manager_num"` ApiUrl string `json:"api_url" mapstructure:"api_url"`
ChainID int64 `json:"chain_id"` ValidatorUrl string `json:"validator_url" mapstructure:"validator_url"`
ApiUrl string `json:"api_url"` OssUrl string `json:"oss_url" mapstructure:"oss_url"`
ValidatorUrl string `json:"validator_url"` WaitLastTaskExecTime int64 `json:"wait_last_task_exec_time" mapstructure:"wait_last_task_exec_time"`
OssUrl string `json:"oss_url"` OpSys string `json:"op_sys" mapstructure:"op_sys"`
WaitLastTaskExecTime int64 `json:"wait_last_task_exec_time"` ReplicateImageNameSuffix string `json:"replicate_image_name_suffix" mapstructure:"replicate_image_name_suffix"`
OpSys string `json:"op_sys"` IsStopLastContainer bool `json:"is_stop_last_container" mapstructure:"is_stop_last_container"`
ReplicateImageNameSuffix string `json:"replicate_image_name_suffix"`
IsStopLastContainer bool `json:"is_stop_last_container"`
} }
var _cfg *Config = nil var _cfg *Config = nil
...@@ -80,6 +80,21 @@ func (c *Config) SetOpSys(sys string) bool { ...@@ -80,6 +80,21 @@ func (c *Config) SetOpSys(sys string) bool {
return true return true
} }
func (c *Config) UpdateFiledInfo() {
c.HeartRespTimeMillis = c.HeartRespTimeSecond * 60 * 60 * 1000
prvKey, err := utils.GetPrv()
if err != nil {
panic("get prv error or delete keystore after restart")
}
c.SignPrivateKey = prvKey
ecdsaPub := prvKey.PublicKey
c.SignPub = common.Bytes2Hex(crypto.FromECDSAPub(&ecdsaPub))
log.Info("PublicKey", c.SignPub)
publicAddr := crypto.PubkeyToAddress(ecdsaPub)
log.Info("publicAddr:", publicAddr)
c.SignPublicAddress = publicAddr
}
func checkDockerServer(rawURL string) (bool, string) { func checkDockerServer(rawURL string) (bool, string) {
if rawURL == "" { if rawURL == "" {
return true, fmt.Sprintf("tcp://%s:%s", "host.docker.internal", "2375") return true, fmt.Sprintf("tcp://%s:%s", "host.docker.internal", "2375")
......
...@@ -3,8 +3,7 @@ ...@@ -3,8 +3,7 @@
"api_url": "https://aigic.ai/admin/api/task/taskheat", "api_url": "https://aigic.ai/admin/api/task/taskheat",
"node_manager_num": 1, "node_manager_num": 1,
"heart_response": 60, "heart_response": 60,
"task_validator_time": 1, "share_gpu_memory_usage": 80,
"container_num": 1,
"chain_id": 100, "chain_id": 100,
"validator_url": "43.198.29.144:20011", "validator_url": "43.198.29.144:20011",
"oss_url": "https://tmp-file.aigic.ai/api/v1/upload", "oss_url": "https://tmp-file.aigic.ai/api/v1/upload",
......
...@@ -12,7 +12,7 @@ var err error ...@@ -12,7 +12,7 @@ var err error
func init() { func init() {
// 打开或创建一个LevelDB数据库 // 打开或创建一个LevelDB数据库
dbInstance, err = leveldb.OpenFile("mydb", nil) dbInstance, err = leveldb.OpenFile("data/mydb", nil)
if err != nil { if err != nil {
log.Error("Leveldb open file failed: ", err) log.Error("Leveldb open file failed: ", err)
} }
......
...@@ -83,12 +83,13 @@ func (m *ModelHandler) MonitorModelInfo() { ...@@ -83,12 +83,13 @@ func (m *ModelHandler) MonitorModelInfo() {
} }
} else { } else {
log.WithField("name", modelInfo.ImageName).Info("The image name is already") log.WithField("name", modelInfo.ImageName).Info("The image name is already")
m.dockerOp.BootUpModelId[modelInfo.ImageName] = modelInfo.TaskId
reportTaskIds = append(reportTaskIds, modelInfo.TaskId) reportTaskIds = append(reportTaskIds, modelInfo.TaskId)
} }
m.dockerOp.SignApi[modelInfo.ImageName] = modelInfo.SignUrl m.dockerOp.SignApi[modelInfo.ImageName] = modelInfo.SignUrl
} }
m.dockerOp.ModelsInfo = modelInfosResp m.dockerOp.ModelsInfo = modelInfosResp
m.dockerOp.ReportTaskIds = reportTaskIds m.dockerOp.ReportModelIds = reportTaskIds
err = os.WriteFile(m.modelsFileName, bodyBytes, 0644) err = os.WriteFile(m.modelsFileName, bodyBytes, 0644)
if err != nil { if err != nil {
log.WithError(err).Error("Error writing models.json") log.WithError(err).Error("Error writing models.json")
......
...@@ -75,6 +75,8 @@ type ModelInfo struct { ...@@ -75,6 +75,8 @@ type ModelInfo struct {
//OutPutJson string `json:"out_put_json"` //OutPutJson string `json:"out_put_json"`
FileExpiresTime string `json:"file_expires_time"` FileExpiresTime string `json:"file_expires_time"`
PublishStatus int `json:"publish_status"` PublishStatus int `json:"publish_status"`
EstimatExeTime int64 `json:"estimat_exe_time"`
StartUpTime int64 `json:"start_up_time"`
} }
type HealthyCheck struct { type HealthyCheck struct {
......
...@@ -57,8 +57,8 @@ func (m *MonitorNm) monitorNmClient() { ...@@ -57,8 +57,8 @@ func (m *MonitorNm) monitorNmClient() {
msgRespWorker.RegisterMsgResp(nodeManager, worker, DeviceInfoResp, nil) msgRespWorker.RegisterMsgResp(nodeManager, worker, DeviceInfoResp, nil)
log.Info("------------------------Send deviceInfo message ended------------------------") log.Info("------------------------Send deviceInfo message ended------------------------")
if len(m.DockerOp.ReportTaskIds) == 0 { if len(m.DockerOp.ReportModelIds) == 0 {
params := utils.BuildParams(m.DockerOp.ReportTaskIds) params := utils.BuildParams(m.DockerOp.ReportModelIds)
msgRespWorker.RegisterMsgResp(nodeManager, worker, SubmitResourceMapRes, params) msgRespWorker.RegisterMsgResp(nodeManager, worker, SubmitResourceMapRes, params)
} }
log.Info("------------------------Send once-off message ended------------------------") log.Info("------------------------Send once-off message ended------------------------")
......
...@@ -37,7 +37,6 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node ...@@ -37,7 +37,6 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node
log.Warn("handlerMsg -> node manager is not running") log.Warn("handlerMsg -> node manager is not running")
return return
} }
heartbeatReq := rev.GetHeartbeatRequest() heartbeatReq := rev.GetHeartbeatRequest()
if heartbeatReq != nil { if heartbeatReq != nil {
n.nodeManager.UpdateLastHeartTime(time.Now()) n.nodeManager.UpdateLastHeartTime(time.Now())
...@@ -49,15 +48,10 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node ...@@ -49,15 +48,10 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node
taskMsg := rev.GetPushTaskMessage() taskMsg := rev.GetPushTaskMessage()
if taskMsg != nil { if taskMsg != nil {
params := utils.BuildParams(taskMsg.TaskId) go func(msgRespWorker *RespMsgWorker, taskMsgWorker *TaskWorker, taskMsg *nodeManagerV1.PushTaskMessage) {
n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, RespTaskAck, params) isCanExecute, bootUpTime, queueWaitTime, executeTime := taskMsgWorker.GetAckResp(taskMsg)
go func(msgRespWorker *RespMsgWorker, ackParams := utils.BuildParams(taskMsg.TaskId, isCanExecute, bootUpTime, queueWaitTime, executeTime)
taskMsgWorker *TaskWorker, taskMsg *nodeManagerV1.PushTaskMessage) { msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, RespTaskAck, ackParams)
if !taskMsgWorker.DockerOp.IsHealthy {
//params := utils.BuildParams(taskMsgWorker.DockerOp.Reason)
//msgRespWorker.RegisterMsgResp(nodeManager, worker, GoodbyeResp, params)
return
}
taskMsgWorker.Wg.Add(1) taskMsgWorker.Wg.Add(1)
taskMsgWorker.TaskMsg <- taskMsg taskMsgWorker.TaskMsg <- taskMsg
taskMsgWorker.Wg.Wait() taskMsgWorker.Wg.Wait()
...@@ -88,15 +82,13 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node ...@@ -88,15 +82,13 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node
taskExecRes.TaskExecError = fmt.Sprintf("worker:%s-%s-%s", conf.GetConfig().SignPublicAddress.Hex(), "Task exec error", taskExecRes.TaskExecError) taskExecRes.TaskExecError = fmt.Sprintf("worker:%s-%s-%s", conf.GetConfig().SignPublicAddress.Hex(), "Task exec error", taskExecRes.TaskExecError)
} }
reqHash, respHash, minerSign := taskMsgWorker.GetMinerSign(taskMsg, taskExecRes.TaskRespBody) reqHash, respHash, minerSign := taskMsgWorker.GetMinerSign(taskMsg, taskExecRes.TaskRespBody)
params := utils.BuildParams(taskMsg.TaskId, containerSign, minerSign, taskExecRes, isSuccess) taskResultParams := utils.BuildParams(taskMsg.TaskId, containerSign, minerSign, taskExecRes, isSuccess)
taskMsgWorker.Mutex.Lock()
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.TaskType, taskMsg.TaskType) taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.TaskType, taskMsg.TaskType)
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.ContainerSign, containerSign) taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.ContainerSign, containerSign)
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.MinerSign, minerSign) taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.MinerSign, minerSign)
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.ReqHash, reqHash) taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.ReqHash, reqHash)
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.RespHash, respHash) taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.RespHash, respHash)
taskMsgWorker.Mutex.Unlock() msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, SubmitResultResp, taskResultParams)
msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, SubmitResultResp, params)
log.Info("--------------taskMsg--------------:", taskMsg) log.Info("--------------taskMsg--------------:", taskMsg)
}(n.msgRespWorker, n.taskMsgWorker, taskMsg) }(n.msgRespWorker, n.taskMsgWorker, taskMsg)
continue continue
...@@ -184,8 +176,20 @@ func (n *NodeManagerHandler) ReportResourceMap(dockerOp *operate.DockerOp) { ...@@ -184,8 +176,20 @@ func (n *NodeManagerHandler) ReportResourceMap(dockerOp *operate.DockerOp) {
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
if len(dockerOp.ReportTaskIds) > 0 { if len(dockerOp.ReportModelIds) > 0 {
params := utils.BuildParams(dockerOp.ReportTaskIds) bootUpModelIds := make([]uint64, 0)
containers := dockerOp.ListContainer()
if containers != nil && len(containers) > 0 {
for _, container := range containers {
if container.State == "running" {
taskId := dockerOp.BootUpModelId[container.Image]
if taskId != 0 {
bootUpModelIds = append(bootUpModelIds, taskId)
}
}
}
}
params := utils.BuildParams(dockerOp.ReportModelIds, bootUpModelIds)
n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, SubmitResourceMapRes, params) n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, SubmitResourceMapRes, params)
ticker = time.NewTicker(time.Minute * 10) ticker = time.NewTicker(time.Minute * 10)
} }
......
package nm package nm
import ( import (
"bytes"
"encoding/binary"
"example.com/m/conf" "example.com/m/conf"
"example.com/m/log" "example.com/m/log"
"example.com/m/models" "example.com/m/models"
"fmt" "fmt"
"github.com/docker/docker/libnetwork/bitmap" "github.com/docker/docker/libnetwork/bitmap"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
nodemanagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1" nodemanagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"github.com/shirou/gopsutil/cpu" "github.com/shirou/gopsutil/cpu"
"strconv" "strconv"
"time"
) )
type WorkerMsgHandler func(params ...interface{}) *nodemanagerV1.WorkerMessage type WorkerMsgHandler func(params ...interface{}) *nodemanagerV1.WorkerMessage
...@@ -77,26 +82,45 @@ func HeartbeatResp(params ...interface{}) *nodemanagerV1.WorkerMessage { ...@@ -77,26 +82,45 @@ func HeartbeatResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
func SubmitResourceMapRes(params ...interface{}) *nodemanagerV1.WorkerMessage { func SubmitResourceMapRes(params ...interface{}) *nodemanagerV1.WorkerMessage {
log.Info("Submit resource map response received params: ", params) log.Info("Submit resource map response received params: ", params)
taskIdIndexes := params[0].([]uint64)
b := bitmap.New(1000000000) existTaskIdIndexes := params[0].([]uint64)
for i := 0; i < len(taskIdIndexes); i++ { existMap := bitmap.New(1000000000)
taskIdIndex := taskIdIndexes[i] for i := 0; i < len(existTaskIdIndexes); i++ {
err := b.Set(taskIdIndex) taskIdIndex := existTaskIdIndexes[i]
err := existMap.Set(taskIdIndex)
if err != nil { if err != nil {
log.WithField("taskId index", taskIdIndex).WithField("error", err).Error("Error setting task id index") log.WithField("taskId index", taskIdIndex).WithField("error", err).Error("Error setting task id index")
return nil return nil
} }
} }
binary, err := b.MarshalBinary() existImage, err := existMap.MarshalBinary()
if err != nil { if err != nil {
log.Error("bitmap marshal binary failed with error: ", err) log.Error("bitmap marshal binary failed with error: ", err)
return nil return nil
} }
log.WithField("", binary).Info("Bit map binary byte")
bootUpModelIdIndexes := params[1].([]uint64)
bootUpMap := bitmap.New(1000000000)
for i := 0; i < len(bootUpModelIdIndexes); i++ {
taskIdIndex := bootUpModelIdIndexes[i]
err := bootUpMap.Set(taskIdIndex)
if err != nil {
log.WithField("taskId index", taskIdIndex).WithField("error", err).Error("Error setting task id index")
return nil
}
}
bootUpImage, err := bootUpMap.MarshalBinary()
if err != nil {
log.Error("bitmap marshal binary failed with error: ", err)
return nil
}
log.WithField("", existImage).Info("Bit map binary byte")
heartRes := &nodemanagerV1.WorkerMessage{ heartRes := &nodemanagerV1.WorkerMessage{
Message: &nodemanagerV1.WorkerMessage_ResourceMap{ Message: &nodemanagerV1.WorkerMessage_ResourceMap{
ResourceMap: &nodemanagerV1.SubmitResourceMap{ ResourceMap: &nodemanagerV1.SubmitResourceMap{
ResourceMap: binary, ResourceMap: existImage,
BootupMap: bootUpImage,
}, },
}, },
} }
...@@ -106,12 +130,24 @@ func SubmitResourceMapRes(params ...interface{}) *nodemanagerV1.WorkerMessage { ...@@ -106,12 +130,24 @@ func SubmitResourceMapRes(params ...interface{}) *nodemanagerV1.WorkerMessage {
func RegisterInfoResp(params ...interface{}) *nodemanagerV1.WorkerMessage { func RegisterInfoResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
log.Info("Register info response received params:", params) log.Info("Register info response received params:", params)
nowTimeStamp := time.Now().Unix()
byteSlice := make([]byte, 8)
binary.BigEndian.PutUint64(byteSlice, uint64(nowTimeStamp))
signHash := crypto.Keccak256Hash(bytes.NewBufferString(conf.GetConfig().GetExternalIp()).Bytes(),
bytes.NewBufferString(conf.GetConfig().SignPub).Bytes(),
bytes.NewBufferString(conf.GetConfig().BenefitAddress).Bytes(),
byteSlice)
log.WithField("hash", signHash.String()).Info("register message sign result")
sign, _ := crypto.Sign(signHash.Bytes(), conf.GetConfig().SignPrivateKey)
log.Info("register message sign:", common.Bytes2Hex(sign))
nodeInfoRes := &nodemanagerV1.WorkerMessage{ nodeInfoRes := &nodemanagerV1.WorkerMessage{
Message: &nodemanagerV1.WorkerMessage_RegisteMessage{ Message: &nodemanagerV1.WorkerMessage_RegisteMessage{
RegisteMessage: &nodemanagerV1.RegisteMessage{ RegisteMessage: &nodemanagerV1.RegisteMessage{
DeviceIp: conf.GetConfig().GetExternalIp(), DeviceIp: conf.GetConfig().GetExternalIp(),
MinerPubkey: conf.GetConfig().SignPub, MinerPubkey: conf.GetConfig().SignPub,
BenefitAddress: conf.GetConfig().BenefitAddress, BenefitAddress: conf.GetConfig().BenefitAddress,
Timestamp: nowTimeStamp,
DeviceSignature: sign,
}, },
}, },
} }
...@@ -252,10 +288,18 @@ func FetchStandardTaskResp(params ...interface{}) *nodemanagerV1.WorkerMessage { ...@@ -252,10 +288,18 @@ func FetchStandardTaskResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
func RespTaskAck(params ...interface{}) *nodemanagerV1.WorkerMessage { func RespTaskAck(params ...interface{}) *nodemanagerV1.WorkerMessage {
taskId := params[0].(string) taskId := params[0].(string)
canExecute := params[1].(bool)
bootUpTime := params[2].(int64)
queueWaitTime := params[3].(int64)
executeTime := params[4].(int64)
taskAckMsgRes := &nodemanagerV1.WorkerMessage{ taskAckMsgRes := &nodemanagerV1.WorkerMessage{
Message: &nodemanagerV1.WorkerMessage_SubmitTaskAck{ Message: &nodemanagerV1.WorkerMessage_SubmitTaskAck{
SubmitTaskAck: &nodemanagerV1.SubmitTaskAck{ SubmitTaskAck: &nodemanagerV1.SubmitTaskAck{
TaskId: taskId, TaskId: taskId,
CanExecute: canExecute,
BootUpTime: bootUpTime,
QueueWaitTime: queueWaitTime,
ExecuteTime: executeTime,
}, },
}, },
} }
......
...@@ -26,17 +26,16 @@ import ( ...@@ -26,17 +26,16 @@ import (
) )
type TaskWorker struct { type TaskWorker struct {
Wg *sync.WaitGroup Wg *sync.WaitGroup
Mutex *sync.Mutex LruCache *lru.Cache
LruCache *lru.Cache DockerOp *operate.DockerOp
DockerOp *operate.DockerOp CmdOp *operate.Command
CmdOp *operate.Command TaskMsg chan *nodeManagerV1.PushTaskMessage
TaskMsg chan *nodeManagerV1.PushTaskMessage IsExecAiTask bool
IsExecAiTask bool IsExecStandardTask bool
IsExecStandardTask bool ExecTaskIdIsFinished *sync.Map
ExecTaskIdIsSuccess *sync.Map lastExecTaskId, lastExecTaskImageName string
oldTaskImageName string lastExecTaskStartTime time.Time
oldTaskId string
} }
type TaskOp struct { type TaskOp struct {
...@@ -52,13 +51,12 @@ type TaskOp struct { ...@@ -52,13 +51,12 @@ type TaskOp struct {
func NewTaskWorker(op *operate.DockerOp) *TaskWorker { func NewTaskWorker(op *operate.DockerOp) *TaskWorker {
return &TaskWorker{ return &TaskWorker{
Wg: &sync.WaitGroup{}, Wg: &sync.WaitGroup{},
Mutex: &sync.Mutex{}, LruCache: lru.New(100),
LruCache: lru.New(100), DockerOp: op,
DockerOp: op, TaskMsg: make(chan *nodeManagerV1.PushTaskMessage, 0),
TaskMsg: make(chan *nodeManagerV1.PushTaskMessage, 0), IsExecAiTask: false,
IsExecAiTask: false, ExecTaskIdIsFinished: &sync.Map{},
ExecTaskIdIsSuccess: &sync.Map{},
} }
} }
...@@ -126,6 +124,7 @@ func (t *TaskWorker) CustomTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) { ...@@ -126,6 +124,7 @@ func (t *TaskWorker) CustomTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) {
func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) { func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) {
defer t.Wg.Done() defer t.Wg.Done()
t.lastExecTaskStartTime = time.Now()
t.checkLastTaskExecStatus(taskMsg) t.checkLastTaskExecStatus(taskMsg)
log.Info("check last task exec status successful") log.Info("check last task exec status successful")
taskOp := &TaskOp{ taskOp := &TaskOp{
...@@ -150,12 +149,12 @@ func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) ...@@ -150,12 +149,12 @@ func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
if err != nil { if err != nil {
log.Errorf("failed to unmarshal task cmd: %s", err.Error()) log.Errorf("failed to unmarshal task cmd: %s", err.Error())
taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "failed to unmarshal task cmd: %s", err.Error()) taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "failed to unmarshal task cmd: %s", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true) t.ExecTaskIdIsFinished.Store(taskMsg.TaskId, true)
return return
} }
taskOp.taskCmd.ImageName = fmt.Sprintf("%s-%s", taskOp.taskCmd.ImageName, conf.GetConfig().OpSys) taskOp.taskCmd.ImageName = fmt.Sprintf("%s-%s", taskOp.taskCmd.ImageName, conf.GetConfig().OpSys)
log.Info("received task cmd :", taskOp.taskCmd) log.Info("received task cmd :", taskOp.taskCmd)
log.WithField("t.oldTaskImageName", t.oldTaskImageName).WithField("newTaskImageName", taskOp.taskCmd.ImageName).Info("task image info") log.WithField("t.lastExecTaskImageName", t.lastExecTaskImageName).WithField("newTaskImageName", taskOp.taskCmd.ImageName).Info("task image info")
if taskMsg.TaskKind != baseV1.TaskKind_StandardTask && conf.GetConfig().IsStopLastContainer { if taskMsg.TaskKind != baseV1.TaskKind_StandardTask && conf.GetConfig().IsStopLastContainer {
t.checkIsStopContainer(taskOp.taskCmd) t.checkIsStopContainer(taskOp.taskCmd)
} }
...@@ -165,14 +164,14 @@ func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) ...@@ -165,14 +164,14 @@ func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
if imageId == "" { if imageId == "" {
log.Error("The image is not found:", taskOp.taskCmd.ImageName) log.Error("The image is not found:", taskOp.taskCmd.ImageName)
taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "The image is not found:", taskOp.taskCmd.ImageName) taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "The image is not found:", taskOp.taskCmd.ImageName)
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true) t.ExecTaskIdIsFinished.Store(taskMsg.TaskId, true)
return return
} }
err = json.Unmarshal(taskMsg.TaskParam, taskOp.taskParam) err = json.Unmarshal(taskMsg.TaskParam, taskOp.taskParam)
if err != nil { if err != nil {
log.WithField("err", err).Error("Error unmarshalling task parameter") log.WithField("err", err).Error("Error unmarshalling task parameter")
taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Error unmarshalling task parameter", err.Error()) taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Error unmarshalling task parameter", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true) t.ExecTaskIdIsFinished.Store(taskMsg.TaskId, true)
return return
} }
running, _, _ := t.foundImageIsRunning(imageId) running, _, _ := t.foundImageIsRunning(imageId)
...@@ -183,19 +182,19 @@ func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) ...@@ -183,19 +182,19 @@ func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
if err != nil { if err != nil {
log.Errorf("Create and start container failed: %s", err.Error()) log.Errorf("Create and start container failed: %s", err.Error())
taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Create and start container failed", err.Error()) taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s,%s", "Create and start container failed", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true) t.ExecTaskIdIsFinished.Store(taskMsg.TaskId, true)
return return
} }
log.Infof("Started container with ID %s", containerId) log.Infof("Started container with ID %s", containerId)
} }
if err = taskOp.waitContainerRunning(t, imageId); err != nil { if err = taskOp.waitContainerRunning(t, imageId); err != nil {
taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s", err.Error()) taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true) t.ExecTaskIdIsFinished.Store(taskMsg.TaskId, true)
return return
} }
if err = taskOp.waitReqContainerOk(t.DockerOp); err != nil { if err = taskOp.waitReqContainerOk(t.DockerOp); err != nil {
taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s", err.Error()) taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s", err.Error())
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true) t.ExecTaskIdIsFinished.Store(taskMsg.TaskId, true)
return return
} }
endAfterTaskTime := time.Since(taskOp.startBeforeTaskTime) endAfterTaskTime := time.Since(taskOp.startBeforeTaskTime)
...@@ -206,11 +205,44 @@ func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) ...@@ -206,11 +205,44 @@ func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
} else if taskMsg.TaskKind == baseV1.TaskKind_StandardTask { } else if taskMsg.TaskKind == baseV1.TaskKind_StandardTask {
t.IsExecStandardTask = false t.IsExecStandardTask = false
} }
t.ExecTaskIdIsSuccess.Store(taskMsg.TaskId, true) t.ExecTaskIdIsFinished.Store(taskMsg.TaskId, true)
//log.WithField("result", taskExecResult).Info("lru cache storage task result") //log.WithField("result", taskExecResult).Info("lru cache storage task result")
log.Info("----------------------Compute task exec done--------------------------------") log.Info("----------------------Compute task exec done--------------------------------")
} }
func (t *TaskWorker) GetAckResp(taskMsg *nodeManagerV1.PushTaskMessage) (isCanExecute bool, bootUpTime, queueWaitTime, executeTime int64) {
if t.IsExecStandardTask {
isCanExecute = true
return
}
taskCmd := &models.TaskCmd{}
err := json.Unmarshal(bytes.NewBufferString(taskMsg.TaskCmd).Bytes(), taskCmd)
if err != nil {
log.Errorf("failed to unmarshal task cmd: %s", err.Error())
return
}
value, ok := t.ExecTaskIdIsFinished.Load(t.lastExecTaskId)
if !ok {
log.WithField("task id", t.lastExecTaskId).Warn("task exec is not finished")
return
}
isSuccess := value.(bool)
log.WithField("isSuccess", isSuccess).Info("Task exec info")
if !isSuccess && !t.lastExecTaskStartTime.IsZero() {
lastTaskImageInfo := t.DockerOp.GetImageInfo(t.lastExecTaskImageName)
since := time.Since(t.lastExecTaskStartTime)
queueWaitTime = lastTaskImageInfo.EstimatExeTime - int64(since.Seconds())
if queueWaitTime < 0 {
queueWaitTime = lastTaskImageInfo.EstimatExeTime
}
}
isCanExecute = true
modelInfo := t.DockerOp.GetImageInfo(taskCmd.ImageName)
bootUpTime = modelInfo.StartUpTime
executeTime = modelInfo.EstimatExeTime
return
}
func (t *TaskWorker) foundTaskImage(taskCmd *models.TaskCmd) (imageId string) { func (t *TaskWorker) foundTaskImage(taskCmd *models.TaskCmd) (imageId string) {
images, err := t.DockerOp.PsImages() images, err := t.DockerOp.PsImages()
if err != nil { if err != nil {
...@@ -267,29 +299,29 @@ func (t *TaskWorker) checkLastTaskExecStatus(taskMsg *nodeManagerV1.PushTaskMess ...@@ -267,29 +299,29 @@ func (t *TaskWorker) checkLastTaskExecStatus(taskMsg *nodeManagerV1.PushTaskMess
} else if taskMsg.TaskKind == baseV1.TaskKind_StandardTask { } else if taskMsg.TaskKind == baseV1.TaskKind_StandardTask {
t.IsExecStandardTask = true t.IsExecStandardTask = true
} }
if t.oldTaskId != taskMsg.TaskId { if t.lastExecTaskId != taskMsg.TaskId {
now := time.Now() now := time.Now()
for { for {
since := time.Since(now) since := time.Since(now)
if int64(since.Seconds()) > conf.GetConfig().WaitLastTaskExecTime { if int64(since.Seconds()) > conf.GetConfig().WaitLastTaskExecTime {
log.WithField("taskId", t.oldTaskId).Info("Waiting for last task execution ending") log.WithField("taskId", t.lastExecTaskId).Info("Waiting for last task execution ending")
t.oldTaskId = taskMsg.TaskId t.lastExecTaskId = taskMsg.TaskId
break break
} }
if t.oldTaskId == "" { if t.lastExecTaskId == "" {
t.oldTaskId = taskMsg.TaskId t.lastExecTaskId = taskMsg.TaskId
break break
} }
value, ok := t.ExecTaskIdIsSuccess.Load(t.oldTaskId) value, ok := t.ExecTaskIdIsFinished.Load(t.lastExecTaskId)
//log.WithField("isSuccess", value).Info("Task id exec info") //log.WithField("isSuccess", value).Info("Task id exec info")
if !ok { if !ok {
//log.WithField("task id", t.oldTaskId).Warn("task exec is not finished") //log.WithField("task id", t.lastExecTaskId).Warn("task exec is not finished")
continue continue
} }
isSuccess := value.(bool) isSuccess := value.(bool)
if isSuccess { if isSuccess {
t.oldTaskId = taskMsg.TaskId t.lastExecTaskId = taskMsg.TaskId
log.WithField("taskId", t.oldTaskId).Info("Task exec success") log.WithField("taskId", t.lastExecTaskId).Info("Task exec success")
break break
} }
} }
...@@ -297,7 +329,7 @@ func (t *TaskWorker) checkLastTaskExecStatus(taskMsg *nodeManagerV1.PushTaskMess ...@@ -297,7 +329,7 @@ func (t *TaskWorker) checkLastTaskExecStatus(taskMsg *nodeManagerV1.PushTaskMess
} }
func (t *TaskWorker) checkIsStopContainer(taskCmd *models.TaskCmd) { func (t *TaskWorker) checkIsStopContainer(taskCmd *models.TaskCmd) {
if t.oldTaskImageName != "" && t.oldTaskImageName != taskCmd.ImageName { if t.lastExecTaskImageName != "" && t.lastExecTaskImageName != taskCmd.ImageName {
//todo: 停止标准任务容器 //todo: 停止标准任务容器
containers := t.DockerOp.ListContainer() containers := t.DockerOp.ListContainer()
for _, container := range containers { for _, container := range containers {
...@@ -305,17 +337,17 @@ func (t *TaskWorker) checkIsStopContainer(taskCmd *models.TaskCmd) { ...@@ -305,17 +337,17 @@ func (t *TaskWorker) checkIsStopContainer(taskCmd *models.TaskCmd) {
if len(split) == 1 { if len(split) == 1 {
container.Image = fmt.Sprintf("%s:%s", container.Image, "latest") container.Image = fmt.Sprintf("%s:%s", container.Image, "latest")
} }
log.WithField("containerImageName", container.Image).WithField("t.oldTaskImageName", t.oldTaskImageName).Info("match image") log.WithField("containerImageName", container.Image).WithField("t.lastExecTaskImageName", t.lastExecTaskImageName).Info("match image")
if container.Image == t.oldTaskImageName && container.State == "running" { if container.Image == t.lastExecTaskImageName && container.State == "running" {
t.DockerOp.StopContainer(container.ID) t.DockerOp.StopContainer(container.ID)
log.WithField("Image name", container.Image).Info("Stopping container") log.WithField("Image name", container.Image).Info("Stopping container")
//t.DockerOp.RunningImages[t.oldTaskImageName] = false //t.DockerOp.RunningImages[t.lastExecTaskImageName] = false
break break
} }
} }
t.oldTaskImageName = taskCmd.ImageName t.lastExecTaskImageName = taskCmd.ImageName
} else { } else {
t.oldTaskImageName = taskCmd.ImageName t.lastExecTaskImageName = taskCmd.ImageName
} }
} }
......
...@@ -29,7 +29,8 @@ type DockerOp struct { ...@@ -29,7 +29,8 @@ type DockerOp struct {
UsedExternalPort map[int64]bool UsedExternalPort map[int64]bool
SignApi map[string]string SignApi map[string]string
ModelsInfo []*models.ModelInfo ModelsInfo []*models.ModelInfo
ReportTaskIds []uint64 ReportModelIds []uint64
BootUpModelId map[string]uint64
//RunningImages map[string]bool //RunningImages map[string]bool
} }
...@@ -52,11 +53,21 @@ func NewDockerOp() *DockerOp { ...@@ -52,11 +53,21 @@ func NewDockerOp() *DockerOp {
SignApi: make(map[string]string, 0), SignApi: make(map[string]string, 0),
ModelsInfo: make([]*models.ModelInfo, 1000), ModelsInfo: make([]*models.ModelInfo, 1000),
UsedExternalPort: make(map[int64]bool, 0), UsedExternalPort: make(map[int64]bool, 0),
ReportTaskIds: make([]uint64, 0), ReportModelIds: make([]uint64, 0),
BootUpModelId: make(map[string]uint64, 0),
//RunningImages: make(map[string]bool, 0), //RunningImages: make(map[string]bool, 0),
} }
} }
func (d *DockerOp) GetImageInfo(imageName string) *models.ModelInfo {
for _, info := range d.ModelsInfo {
if info.ImageName == imageName {
return info
}
}
return nil
}
func (d *DockerOp) GetContainerSign(taskMsg *nodemanagerv1.PushTaskMessage, taskRes []byte) []byte { func (d *DockerOp) GetContainerSign(taskMsg *nodemanagerv1.PushTaskMessage, taskRes []byte) []byte {
reqBody := &models.TaskReq{ reqBody := &models.TaskReq{
TaskId: taskMsg.TaskId, TaskId: taskMsg.TaskId,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment