Commit a62b4c2e authored by duanjinfei's avatar duanjinfei

update msg resp

parent 14cdac1e
......@@ -20,13 +20,15 @@ func (c *StateController) GetRunningState() {
func (c *StateController) GetRunningTp() {
info := utils.GetHardwareInfo()
var totalTemp int64
for _, gpu := range info.Data.Gpus {
totalTemp += gpu.Temp
if len(info.Data.Gpus) > 0 {
var totalTemp int64
for _, gpu := range info.Data.Gpus {
totalTemp += gpu.Temp
}
avgTemp := totalTemp / int64(len(info.Data.Gpus))
c.ResponseInfo(200, "get running state successful", avgTemp)
}
avgTemp := totalTemp / int64(len(info.Data.Gpus))
c.ResponseInfo(200, "get running state successful", avgTemp)
c.ResponseInfo(500, "get running tp failed", 0)
}
func (c *StateController) GetWorkerInfo() {
......@@ -78,10 +80,12 @@ func (c *StateController) GetOtherHardwareInfo() {
if point == "/" {
diskTotal += disk.SizeBytes
diskFree += disk.FreeBytes
break
}
}
}
diskUsage := int32((1 - diskFree/diskTotal) * 100)
diskUsage := int32((1 - float64(diskFree)/float64(diskTotal)) * 100)
res := &models.OtherHardwareInfoResp{
NodeID: conf.GetConfig().SignPublicAddress.Hex(),
CpuName: info.Data.Cpus.Model,
......
......@@ -12,12 +12,15 @@ var dbInstance *leveldb.DB
var err error
var modelKeys map[string]bool
func init() {
// 打开或创建一个LevelDB数据库
dbInstance, err = leveldb.OpenFile("data/mydb", nil)
if err != nil {
log.Error("Leveldb open file failed: ", err)
}
modelKeys = make(map[string]bool, 0)
}
func Put(key string, value any) error {
......@@ -55,6 +58,33 @@ func Get(key string) ([]byte, error) {
return data, nil
}
func PutModel(key string, value any) error {
valueByte, err := json.Marshal(value)
if err != nil {
log.Error("Leveldb put data failed:", err)
return err
}
// 存储数据
err = dbInstance.Put([]byte(key), valueByte, nil)
if err != nil {
log.Error("Leveldb put data failed:", err)
return err
}
modelKeys[key] = true
return nil
}
func GetAllModels() ([]*models.ModelInfo, error) {
res := make([]*models.ModelInfo, 0)
for key := range modelKeys {
model, _ := GetModel(key)
if model != nil {
res = append(res, model)
}
}
return res, nil
}
func GetModel(key string) (*models.ModelInfo, error) {
data, err := dbInstance.Get([]byte(key), nil)
if err != nil {
......
......@@ -11,25 +11,20 @@ import (
nodemanagerV2 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
"io"
"net/http"
"os"
"strconv"
"strings"
"time"
)
type ModelHandler struct {
dockerOp *operate.DockerOp
client *http.Client
modelsFileName string
hotModel map[string]bool
popularModel map[string]bool
dockerOp *operate.DockerOp
client *http.Client
}
func NewModelHandler(dockerOp *operate.DockerOp) *ModelHandler {
return &ModelHandler{
dockerOp: dockerOp,
client: &http.Client{},
modelsFileName: "models.json",
dockerOp: dockerOp,
client: &http.Client{},
}
}
......@@ -74,7 +69,11 @@ func (m *ModelHandler) MonitorModelInfo() {
}
log.WithField("name", modelInfo.ImageName).Info("The image name is already")
m.dockerOp.SignApi[modelInfo.ImageName] = modelInfo.SignUrl
err := db.Put(modelInfo.ImageName, modelInfo)
model, _ := db.GetModel(modelInfo.ImageName)
if model != nil {
model.UpdateFiled(modelInfo)
}
err := db.PutModel(modelInfo.ImageName, modelInfo)
if err != nil {
log.WithError(err).Error("Put db error")
continue
......@@ -85,33 +84,10 @@ func (m *ModelHandler) MonitorModelInfo() {
}
}
func (m *ModelHandler) ReadModels() ([]*models.ModelInfo, error) {
bodyBytes, err := os.ReadFile(m.modelsFileName)
if err != nil {
log.WithError(err).WithField("fileName", m.modelsFileName).Error("Error reading")
return nil, err
}
resp := &models.Resp{}
err = json.Unmarshal(bodyBytes, resp)
if err != nil {
log.WithField("fileName", m.modelsFileName).Error("Unmarshal model response failed:", err)
return nil, err
}
if resp.Code != http.StatusOK {
log.WithField("fileName", m.modelsFileName).Error("Response code :", resp.Code)
return nil, err
}
if resp.Data == nil || len(resp.Data) == 0 {
log.WithField("fileName", m.modelsFileName).Warn("Response data is empty")
return nil, err
}
return resp.Data, nil
}
func (m *ModelHandler) GetRpcModelsResp() (*nodemanagerV2.ModelsInfo, error) {
installedModels := make([]*nodemanagerV2.InstalledModel, 0)
runningModels := make([]*nodemanagerV2.RunningModel, 0)
readModels, err := m.ReadModels()
readModels, err := db.GetAllModels()
if err != nil {
log.WithError(err).Error("Error reading models")
return nil, err
......@@ -144,6 +120,54 @@ func (m *ModelHandler) GetRpcModelsResp() (*nodemanagerV2.ModelsInfo, error) {
return res, nil
}
func (m *ModelHandler) MonitorModelStatus() {
ticker := time.NewTicker(time.Second * 5)
for {
select {
case <-ticker.C:
{
imageList, _ := m.dockerOp.PsImages()
if imageList != nil && len(imageList) > 0 {
for _, image := range imageList {
keys := image.RepoTags
for _, key := range keys {
model, _ := db.GetModel(key)
if model != nil && !model.IsInstalled {
model.SetupTime = time.Now().Unix()
model.IsInstalled = true
err := db.PutModel(key, model)
if err != nil {
continue
}
}
}
}
}
containerList := m.dockerOp.ListContainer()
if containerList != nil && len(containerList) > 0 {
for _, container := range containerList {
key := container.Image
model, err := db.GetModel(key)
if err != nil || model == nil {
continue
}
if container.State == "running" && !model.IsRunning {
model.ContainerId = container.ID
model.LastRunTime = time.Now().Unix()
model.IsRunning = true
err = db.PutModel(key, model)
if err != nil {
continue
}
}
}
}
}
}
}
}
func (m *ModelHandler) isResourceEnough(modelInfo *models.ModelInfo) bool {
return true
}
......
......@@ -23,4 +23,5 @@ const (
DefaultTaskTimer = 2
EncryptionKey = "uxhendjFYsoWFnsO"
HistoryBenefitAddressDirectory = "data/benefitList"
CudaEnv = "CUDA_VISIBLE_DEVICES"
)
......@@ -14,6 +14,7 @@ type TaskCmd struct {
type DockerCmd struct {
ContainerPort int64 `json:"container_port"`
EnvMap map[string]string
HostIp string
HostPort string
}
......@@ -136,16 +137,35 @@ type ModelInfo struct {
Pwd string `json:"pwd"`
SignUrl string `json:"sign_url"`
ImageName string `json:"image_name"`
//OutPutJson string `json:"out_put_json"`
FileExpiresTime string `json:"file_expires_time"`
PublishStatus int `json:"publish_status"`
EstimatExeTime int64 `json:"estimat_exe_time"`
StartUpTime int64 `json:"start_up_time"`
RunningMem int64 `json:"running_mem"`
SetupTime int64 `json:"setup_time"`
LastRunTime int64 `json:"last_run_time"`
IsInstalled bool `json:"is_installed"`
IsRunning bool `json:"is_running"`
FileExpiresTime string `json:"file_expires_time"`
PublishStatus int `json:"publish_status"`
EstimatExeTime int32 `json:"estimat_exe_time"`
StartUpTime int64 `json:"start_up_time"`
RunningMem int64 `json:"running_mem"`
OpTime int64
SetupTime int64
LastRunTime int64
ImageId string
ContainerId string
IsInstalled bool
IsRunning bool
GpuSeq int32
GpuRam int64
LastWorkTime int64
TotalRunCount int32
}
func (m *ModelInfo) UpdateFiled(model *ModelInfo) {
m.Time = model.Time
m.Count = model.Count
m.HardwareRequire = model.HardwareRequire
m.Kind = model.Kind
m.TaskId = model.TaskId
m.User = model.User
m.Pwd = model.Pwd
m.SignUrl = model.SignUrl
m.FileExpiresTime = model.FileExpiresTime
m.PublishStatus = model.PublishStatus
}
type HealthyCheck struct {
......
......@@ -2,12 +2,15 @@ package nm
import (
"example.com/m/conf"
"example.com/m/db"
"example.com/m/largeModel"
"example.com/m/log"
"example.com/m/models"
"example.com/m/operate"
"example.com/m/utils"
"fmt"
nodemanagerV2 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
"strconv"
"time"
)
......@@ -36,6 +39,7 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node
log.Warn("handlerMsg -> node manager is not running")
return
}
heartbeatReq := rev.GetHeartbeatRequest()
if heartbeatReq != nil {
unix := time.Unix(int64(heartbeatReq.Timestamp), 0)
......@@ -134,7 +138,68 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node
modelOpMsg := rev.GetModelOperateRequest()
if modelOpMsg != nil {
modelOpMsg.GetModelOperates()
go func(modelOpMsg *nodemanagerV2.ModelOperateRequest, dockerOp *operate.DockerOp) {
operates := modelOpMsg.GetModelOperates()
for _, modelOp := range operates {
go n.MonitorImageOp(modelOp)
model, err := db.GetModel(modelOp.ImageName)
if err != nil {
log.WithError(err).Error("Op model - get model error")
return
}
switch modelOp.Operate {
case nodemanagerV2.ModelOperateType_INSTALL:
{
go dockerOp.PullImage(modelOp.ImageName)
}
case nodemanagerV2.ModelOperateType_DELETE:
{
if model.ContainerId != "" {
isRunning := dockerOp.ContainerIsRunning(model.ContainerId)
if isRunning {
dockerOp.StopAndDeleteContainer(model.ContainerId)
}
}
go dockerOp.RmImage(modelOp.ImageName)
}
case nodemanagerV2.ModelOperateType_RUN:
{
envMap := make(map[string]string, 0)
dockerCmd := &models.DockerCmd{
EnvMap: envMap,
HostIp: models.ZeroHost,
HostPort: n.taskMsgWorker.getExternalPort(),
}
info := getHardwareInfo()
gpu := info.GPU
isMatch := false
for _, gpuInfo := range gpu {
if gpuInfo.MemFree > model.GpuRam {
envMap[models.CudaEnv] = strconv.FormatInt(int64(gpuInfo.Seq), 10)
isMatch = true
break
}
}
if !isMatch {
}
if isMatch {
_, err := dockerOp.CreateAndStartContainer(model.ImageName, dockerCmd)
if err != nil {
log.WithError(err).Error("Error creating container")
continue
}
}
}
case nodemanagerV2.ModelOperateType_STOP:
{
if model.ContainerId != "" {
dockerOp.StopContainer(model.ContainerId)
}
}
}
}
}(modelOpMsg, n.taskMsgWorker.DockerOp)
continue
}
......@@ -158,6 +223,131 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node
}
}
func (n *NodeManagerHandler) MonitorImageOp(op *nodemanagerV2.ModelOperate) {
model, err := db.GetModel(op.ImageName)
if err != nil {
log.WithError(err).Error("Op model - get model error")
return
}
model.OpTime = time.Now().Unix()
ticker := time.NewTicker(time.Second * 2)
isOp := false
switch op.Operate {
case nodemanagerV2.ModelOperateType_INSTALL:
{
now := time.Now()
for {
select {
case <-ticker.C:
if time.Since(now).Seconds() > 36000 || isOp {
break
}
imagesMap, err := n.taskMsgWorker.DockerOp.PsImageNameMap()
if err != nil {
log.WithError(err).Error("Ps image name map failed")
return
}
if imagesMap[op.ImageName] {
isOp = true
model.IsInstalled = true
model.SetupTime = time.Now().Unix()
diskSize, _ := strconv.ParseInt(model.HardwareRequire.DiskSize, 10, 64)
params := utils.BuildParams(strconv.FormatUint(model.TaskId, 10), diskSize, model.SetupTime, model.LastRunTime)
n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, AddModelInstalledResp, params)
return
}
}
}
}
case nodemanagerV2.ModelOperateType_DELETE:
{
now := time.Now()
for {
select {
case <-ticker.C:
if time.Since(now).Seconds() > 36000 || isOp {
break
}
imagesMap, err := n.taskMsgWorker.DockerOp.PsImageNameMap()
if err != nil {
log.WithError(err).Error("Ps image name map failed")
return
}
if !imagesMap[op.ImageName] {
isOp = true
model.IsInstalled = false
model.IsRunning = false
model.ContainerId = ""
params := utils.BuildParams(strconv.FormatUint(model.TaskId, 10))
n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, DelModelInstalledResp, params)
return
}
}
}
}
case nodemanagerV2.ModelOperateType_RUN:
{
now := time.Now()
for {
select {
case <-ticker.C:
if time.Since(now).Seconds() > 360 || isOp {
break
}
listContainers := n.taskMsgWorker.DockerOp.ListContainer()
if listContainers != nil && len(listContainers) > 0 {
for _, container := range listContainers {
if container.Image == op.ImageName {
isOp = true
model.ContainerId = ""
model.IsRunning = true
model.LastRunTime = time.Now().Unix()
params := utils.BuildParams(strconv.FormatUint(model.TaskId, 10), model.GpuSeq, model.GpuRam, model.LastRunTime, model.LastWorkTime, model.TotalRunCount, model.EstimatExeTime)
n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, AddModelRunningResp, params)
break
}
}
}
}
}
}
case nodemanagerV2.ModelOperateType_STOP:
{
now := time.Now()
for {
select {
case <-ticker.C:
if time.Since(now).Seconds() > 360 || isOp {
break
}
listContainers := n.taskMsgWorker.DockerOp.ListContainer()
if listContainers != nil && len(listContainers) > 0 {
isFound := false
for _, container := range listContainers {
if container.Image == op.ImageName {
isFound = true
}
}
if !isFound {
isOp = true
model.GpuSeq = 999
model.IsRunning = false
model.ContainerId = ""
params := utils.BuildParams(strconv.FormatUint(model.TaskId, 10))
n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, DelModelRunningResp, params)
break
}
}
}
}
}
}
err = db.PutModel(model.ImageName, model)
if err != nil {
log.WithError(err).Error("Db put model failed")
}
}
func (n *NodeManagerHandler) MonitorStandardTaskWorker() {
//ticker := time.NewTicker(time.Second * 30)
ticker := time.NewTicker(time.Minute * 5)
......
......@@ -156,8 +156,8 @@ func DeviceInfoResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
func DeviceUsageResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("DeviceUsageResp params :", params)
hardwareInfo := getHardwareInfo()
ramUsage := int32((1 - hardwareInfo.RAM.Total/hardwareInfo.RAM.Free) * 100)
diskUsage := int32((1 - hardwareInfo.DISK.Total/hardwareInfo.DISK.Free) * 100)
ramUsage := int32((1 - float64(hardwareInfo.RAM.Total)/float64(hardwareInfo.RAM.Free)) * 100)
diskUsage := int32((1 - float64(hardwareInfo.DISK.Total)/float64(hardwareInfo.DISK.Free)) * 100)
deviceInfoRes := &nodemanagerV2.WorkerMessage{
Message: &nodemanagerV2.WorkerMessage_DeviceUsage{
DeviceUsage: &nodemanagerV2.DeviceUsageResponse{
......@@ -293,6 +293,77 @@ func GoodbyeResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
return goodbyeMsgRes
}
func AddModelInstalledResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("Add model installed info response received params:", params)
installedModels := make([]*nodemanagerV2.InstalledModel, 0)
model := &nodemanagerV2.InstalledModel{
ModelId: params[0].(string),
DiskSize: params[1].(int64),
InstalledTime: params[2].(int64),
LastRunTime: params[3].(int64),
}
installedModels = append(installedModels, model)
deviceInfoRes := &nodemanagerV2.WorkerMessage{
Message: &nodemanagerV2.WorkerMessage_AddModelInstalled{
AddModelInstalled: &nodemanagerV2.AddModelInstalled{
Models: installedModels,
},
},
}
log.Info("---------------------------------------Send add model installed info msg ------------------------------------")
return deviceInfoRes
}
func DelModelInstalledResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("Del model installed info response received params:", params)
deviceInfoRes := &nodemanagerV2.WorkerMessage{
Message: &nodemanagerV2.WorkerMessage_DelModelInstalled{
DelModelInstalled: &nodemanagerV2.DelModelInstalled{
ModelIds: []string{params[0].(string)},
},
},
}
log.Info("---------------------------------------Del model installed info msg ------------------------------------")
return deviceInfoRes
}
func AddModelRunningResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("Add model running response received params:", params)
runningModels := make([]*nodemanagerV2.RunningModel, 0)
model := &nodemanagerV2.RunningModel{
ModelId: params[0].(string),
GpuSeq: params[1].(int32),
GpuRam: params[2].(int64),
StartedTime: params[3].(int64),
LastWorkTime: params[4].(int64),
TotalRunCount: params[5].(int32),
WaitTime: params[6].(int32),
}
runningModels = append(runningModels, model)
addModelRunningRes := &nodemanagerV2.WorkerMessage{
Message: &nodemanagerV2.WorkerMessage_AddModelRunning{
AddModelRunning: &nodemanagerV2.AddModelRunning{
Models: runningModels,
},
},
}
log.Info("---------------------------------------Send Add model running response msg ------------------------------------")
return addModelRunningRes
}
func DelModelRunningResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("Del model running response received params:", params)
delModelRunningRes := &nodemanagerV2.WorkerMessage{
Message: &nodemanagerV2.WorkerMessage_DelModeRunning{
DelModeRunning: &nodemanagerV2.DelModelRunning{
ModelIds: []string{params[0].(string)},
},
},
}
log.Info("---------------------------------------Send del model running response msg ------------------------------------")
return delModelRunningRes
}
func getHardwareInfo() *nodemanagerV2.HardwareInfo {
hardwareInfo := utils.GetHardwareInfo()
gpusInfo := make([]*nodemanagerV2.GPUInfo, 0)
......
......@@ -39,6 +39,7 @@ func StartMonitor() {
go modelHandler.MonitorModelInfo()
for !monitorNm.IsInit {
time.Sleep(time.Second)
}
var connectNodeManagerCount int64 = 0
......
......@@ -46,7 +46,8 @@ type TaskOp struct {
taskParam *models.TaskParam
httpClient *http.Client
request *http.Request
ticker *time.Ticker
waitRunningTicker *time.Ticker
waitReqTicker *time.Ticker
startBeforeTaskTime time.Time
}
......@@ -142,7 +143,8 @@ func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodemanagerV2.PushTaskMessage)
taskParam: &models.TaskParam{},
httpClient: &http.Client{},
request: &http.Request{},
ticker: time.NewTicker(time.Second * models.DefaultTaskTimer),
waitRunningTicker: time.NewTicker(time.Millisecond),
waitReqTicker: time.NewTicker(time.Millisecond),
startBeforeTaskTime: time.Now(),
}
t.LruCache.Add(taskMsg.TaskId, taskOp.taskExecResult)
......@@ -167,7 +169,7 @@ func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodemanagerV2.PushTaskMessage)
t.ExecTaskIdIsFinished.Store(taskMsg.TaskId, true)
return
}
running, _, _ := t.foundImageIsRunning(taskOp.taskCmd.ImageName)
running, _ := t.foundImageIsRunning(taskOp.taskCmd.ImageName)
if !running {
taskOp.taskCmd.DockerCmd.HostIp = models.ZeroHost
taskOp.taskCmd.DockerCmd.HostPort = t.getExternalPort()
......@@ -180,7 +182,7 @@ func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodemanagerV2.PushTaskMessage)
}
log.Infof("Started container with ID %s", containerId)
}
if err = taskOp.waitContainerRunning(t, taskOp.taskCmd.ImageName); err != nil {
if err = taskOp.waitContainerRunning(t, taskOp.taskCmd.ImageName, uint16(taskOp.taskCmd.DockerCmd.ContainerPort)); err != nil {
taskOp.taskExecResult.TaskExecError = fmt.Sprintf("%s", err.Error())
t.ExecTaskIdIsFinished.Store(taskMsg.TaskId, true)
return
......@@ -226,9 +228,9 @@ func (t *TaskWorker) GetAckResp(taskMsg *nodemanagerV2.PushTaskMessage) (isCanEx
return false, 0, 0, 0
}
since := time.Since(t.lastExecTaskStartTime)
queueWaitTime = lastTaskImageInfo.EstimatExeTime - int64(since.Seconds())
queueWaitTime = int64(lastTaskImageInfo.EstimatExeTime - int32(since.Seconds()))
if queueWaitTime < 0 {
queueWaitTime = lastTaskImageInfo.EstimatExeTime
queueWaitTime = int64(lastTaskImageInfo.EstimatExeTime)
}
}
}
......@@ -236,7 +238,7 @@ func (t *TaskWorker) GetAckResp(taskMsg *nodemanagerV2.PushTaskMessage) (isCanEx
log.WithField("imageName", taskCmd.ImageName).Error("The image is not found")
return
}
running, _, _ := t.foundImageIsRunning(taskCmd.ImageName)
running, _ := t.foundImageIsRunning(taskCmd.ImageName)
if !running {
}
......@@ -248,7 +250,7 @@ func (t *TaskWorker) GetAckResp(taskMsg *nodemanagerV2.PushTaskMessage) (isCanEx
}
if modelInfo != nil {
bootUpTime = modelInfo.StartUpTime
executeTime = modelInfo.EstimatExeTime
executeTime = int64(modelInfo.EstimatExeTime)
}
return
}
......@@ -278,7 +280,7 @@ func (t *TaskWorker) foundTaskImage(taskCmd *models.TaskCmd) (imageId string) {
return
}
func (t *TaskWorker) foundImageIsRunning(imageName string) (bool, string, uint16) {
func (t *TaskWorker) foundImageIsRunning(imageName string) (bool, string) {
containers := t.DockerOp.ListContainer()
for _, container := range containers {
if container.Image == imageName && container.State == "running" {
......@@ -288,10 +290,10 @@ func (t *TaskWorker) foundImageIsRunning(imageName string) (bool, string, uint16
ip = endPoint.IPAddress
log.Warn("Container network ip:", ip)
}
return true, ip, container.Ports[0].PrivatePort
return true, ip
}
}
return false, "", 0
return false, ""
}
func (t *TaskWorker) checkLastTaskExecStatus(taskMsg *nodemanagerV2.PushTaskMessage) {
......@@ -475,17 +477,18 @@ func (op *TaskOp) checkContainerHealthy(internalIp string, internalPort uint16)
return true, nil
}
func (op *TaskOp) waitContainerRunning(handler *TaskWorker, imageName string) error {
func (op *TaskOp) waitContainerRunning(handler *TaskWorker, imageName string, internalPort uint16) error {
maxExecTime := op.GetMaxExecTime()
log.WithField("maxExecTime", maxExecTime).Info("Waiting for container running", imageName)
for {
select {
case <-op.ticker.C:
case <-op.waitRunningTicker.C:
op.waitRunningTicker = time.NewTicker(time.Second * models.DefaultTaskTimer)
if int64(time.Since(op.startBeforeTaskTime).Seconds()) > maxExecTime-50 {
log.Errorf("%s", "The maximum execution time for this task has been exceeded")
return fmt.Errorf("%s", "The maximum execution time for this task has been exceeded")
}
running, internalIp, internalPort := handler.foundImageIsRunning(imageName)
running, internalIp := handler.foundImageIsRunning(imageName)
if !running {
continue
}
......@@ -494,6 +497,7 @@ func (op *TaskOp) waitContainerRunning(handler *TaskWorker, imageName string) er
log.WithField("err", err).Errorf("check container healthy failed")
return fmt.Errorf("%s-%s", "check container healthy failed", err.Error())
} else if !isReqSuccess {
continue
}
}
......@@ -539,7 +543,8 @@ func (op *TaskOp) waitReqContainerOk(dockerOp *operate.DockerOp) error {
var err error
for {
select {
case <-op.ticker.C:
case <-op.waitReqTicker.C:
op.waitRunningTicker = time.NewTicker(time.Second * models.DefaultTaskTimer)
if int64(time.Since(op.startBeforeTaskTime).Seconds()) > maxExecTime-50 {
log.Errorf("%s", "The maximum execution time for this task has been exceeded")
return fmt.Errorf("%s", "The maximum execution time for this task has been exceeded")
......
......@@ -178,10 +178,17 @@ func (d *DockerOp) CreateContainer(imageName string, dockerCmd *models.DockerCmd
exposePortSet[natPort] = struct{}{}
portMap := make(nat.PortMap)
portMap[natPort] = portBinds
env := make([]string, 0)
if dockerCmd.EnvMap != nil && len(dockerCmd.EnvMap) > 0 {
for key, val := range dockerCmd.EnvMap {
env = append(env, fmt.Sprintf("%s=%s", key, val))
}
}
resp, err := d.dockerClient.ContainerCreate(ctx, &container.Config{
ExposedPorts: exposePortSet,
Image: imageName,
Tty: false,
Env: env,
}, &container.HostConfig{
PortBindings: portMap,
AutoRemove: true, // 容器停止后自动删除
......@@ -282,10 +289,10 @@ func (d *DockerOp) PsImageNameMap() (map[string]bool, error) {
return res, nil
}
func (d *DockerOp) PullImage(info *models.ModelInfo) {
response, err := d.dockerClient.ImagePull(context.Background(), info.ImageName, types.ImagePullOptions{})
func (d *DockerOp) PullImage(imageName string) {
response, err := d.dockerClient.ImagePull(context.Background(), imageName, types.ImagePullOptions{})
if err != nil {
log.Errorf("Error pulling image from %s: %v", info.ImageName, err)
log.Errorf("Error pulling image from %s: %v", imageName, err)
return
}
defer func(response io.ReadCloser) {
......@@ -294,22 +301,24 @@ func (d *DockerOp) PullImage(info *models.ModelInfo) {
log.WithError(err).Error("Close image pull response failed")
}
}(response)
// 读取拉取镜像的输出
//if _, err = io.ReadAll(response); err != nil {
// log.WithError(err).Error("Read image pull response failed")
// return
//}
log.Info("Image pulled successfully.")
}
func (d *DockerOp) RmImage(imageId string) {
func (d *DockerOp) RmImage(imageName string) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*20)
defer cancel()
// 删除镜像
_, err := d.dockerClient.ImageRemove(ctx, imageId, types.ImageRemoveOptions{})
if err != nil {
panic(err)
imageId := ""
images, _ := d.PsImages()
for _, image := range images {
for _, tag := range image.RepoTags {
if tag == imageName {
imageId = image.ID
break
}
}
}
// 删除镜像
d.dockerClient.ImageRemove(ctx, imageId, types.ImageRemoveOptions{})
log.Info("Image deleted successfully.")
}
......
package validator
import (
"context"
"encoding/json"
"example.com/m/conf"
"example.com/m/db"
"example.com/m/log"
"example.com/m/operate"
nodemanagerv1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
witnessV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/witness/v1"
"google.golang.org/grpc"
"math/rand"
"time"
)
type ProofWorker struct {
productProofChan chan *witnessV1.Proof
consumeProofChan chan []*witnessV1.Proof
isCommitProof map[string]bool
}
func NewProofWorker() *ProofWorker {
return &ProofWorker{
productProofChan: make(chan *witnessV1.Proof, 1000),
consumeProofChan: make(chan []*witnessV1.Proof, 1000),
isCommitProof: make(map[string]bool, 0),
}
}
func (p *ProofWorker) ProductProof(nmResultMsg *nodemanagerv1.ProofTaskResult, taskType uint64, reqHash []byte, respHash []byte, containerSign, minerSign []byte) {
log.Info("ProductProof received workLoad:", nmResultMsg.Workload)
p.productProofChan <- &witnessV1.Proof{
Workload: nmResultMsg.Workload,
TaskId: nmResultMsg.TaskId,
ReqHash: reqHash,
RespHash: respHash,
ManagerSignature: nmResultMsg.ManagerSignature,
ContainerSignature: containerSign,
MinerSignature: minerSign,
TaskType: taskType,
Timestamp: nmResultMsg.Timestamp,
}
}
func (p *ProofWorker) ProofStorage() {
go func(productProofChan chan *witnessV1.Proof) {
for {
select {
case proof := <-productProofChan:
{
proofByte, err := json.Marshal(proof)
if err != nil {
log.Error("Failed to marshal proof: ", err)
continue
}
err = db.Put(proof.TaskId, proofByte)
if err != nil {
log.Error("leveldb put proof failed: ", err)
continue
}
p.isCommitProof[proof.TaskId] = false
}
}
}
}(p.productProofChan)
timer := time.NewTicker(time.Minute)
randomMinute := getRandInt()
for {
select {
case <-timer.C:
nowTime := time.Now()
min := nowTime.Minute()
if min == 0 {
randomMinute = getRandInt()
}
if nowTime.Hour() == 23 {
randomMinute = 59
}
// 检查是否在指定时间范围内(40-59分钟)
if min >= 40 && min <= 59 && min == randomMinute {
proofs := make([]*witnessV1.Proof, 0)
iter, err := db.NewIterator()
if err != nil {
log.Error("db new iterator failed: ", err)
continue
}
if iter == nil {
log.Warn("level db iterator is nil")
continue
}
for iter.Next() {
proof := &witnessV1.Proof{}
err := json.Unmarshal(iter.Value(), proof)
if err != nil {
log.Error("Error parsing proof from database: ", err)
continue
}
if p.isCommitProof[proof.TaskId] {
continue
}
p.isCommitProof[proof.TaskId] = true
proofs = append(proofs, proof)
//err = db.Delete(iter.Key())
//if err != nil {
// log.Error("Error deleting proof from database: ", err)
// return
//}
}
if len(proofs) > 0 {
p.consumeProofChan <- proofs
log.Info("---------------------------Storage proof data---------------------------")
}
}
}
}
}
func (p *ProofWorker) CommitWitness() {
validatorClient := operate.ConnValidatorGrpc(conf.GetConfig().ValidatorUrl)
for {
select {
case proofs := <-p.consumeProofChan:
proofsReq := &witnessV1.PushProofRequest{
Proofs: proofs,
MinerAddress: conf.GetConfig().SignPublicAddress.Hex(),
RewardAddress: conf.GetConfig().BenefitAddress,
}
pushProof, err := validatorClient.PushProof(context.Background(), proofsReq, grpc.EmptyCallOption{})
if err != nil {
log.Error("Push proof failed :", err)
continue
}
workload := pushProof.GetWorkload()
log.Info("Commit proof time:", time.Now())
log.Info("Push proof response received : %v", workload)
log.Info("---------------------------Commit witness data---------------------------")
}
}
}
func getRandInt() int {
return rand.Intn(20) + 40
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment