Commit 94e34c67 authored by duanjinfei's avatar duanjinfei

master v2 first commit

parent 588db8db
package main
import (
"example.com/m/conf"
"example.com/m/log"
"github.com/spf13/cobra"
)
func init() {
//RootCmd.AddCommand(paramCmd)
rewardAddr = *paramCmd.PersistentFlags().StringP("reward", "r", "", "Print detail version info")
}
// versionCmd represents the base command when called without any subcommands
var paramCmd = &cobra.Command{
Use: "param",
Short: "",
Long: ``,
Run: func(cmd *cobra.Command, args []string) {
if rewardAddr != "" {
isSetReward := conf.GetConfig().SetRewardAddress(rewardAddr)
if !isSetReward {
log.Error("Please set right reward address")
}
}
},
}
......@@ -4,6 +4,7 @@ import (
"example.com/m/conf"
"example.com/m/log"
"example.com/m/nm"
"example.com/m/utils"
"fmt"
"github.com/astaxie/beego"
"github.com/fsnotify/fsnotify"
......@@ -13,14 +14,15 @@ import (
)
var (
rewardAddr, dockerServer, externalIp, opSys string
rewardAddr, externalIp, opSys string
debug bool
)
func init() {
RootCmd.PersistentFlags().StringVarP(&rewardAddr, "reward", "r", "", "please enter a reward address")
RootCmd.PersistentFlags().StringVarP(&dockerServer, "docker_server", "d", "", "please enter docker server address")
RootCmd.PersistentFlags().StringVarP(&externalIp, "externalIp", "e", "", "please enter server external ip address")
RootCmd.PersistentFlags().StringVarP(&rewardAddr, "reward", "r", "0x0Fb196385c8826e3806ebA2cA2cb78B26E08fEEe", "please enter a reward address")
RootCmd.PersistentFlags().StringVarP(&externalIp, "externalIp", "e", "192.168.1.120", "please enter server external ip address")
RootCmd.PersistentFlags().StringVarP(&opSys, "opSys", "s", "", "please enter you op sys name : win、linux、mac")
RootCmd.PersistentFlags().BoolVarP(&debug, "debug", "d", false, "set log level debug")
cobra.OnInitialize(initConfig)
}
......@@ -40,21 +42,37 @@ var RootCmd = &cobra.Command{
return
}
}
isSetDockerServer := conf.GetConfig().SetDockerServerUrl(dockerServer)
if !isSetDockerServer {
log.Error("Enter right docker server address:", dockerServer)
return
}
log.Info("Enter docker server url:", dockerServer)
isSetReward := conf.GetConfig().SetRewardAddress(rewardAddr)
if !isSetReward {
isSetBenefit := conf.GetConfig().SetBenefitAddress(rewardAddr)
if !isSetBenefit {
log.Error("Please set right reward address")
return
}
fileBenefitAcc, _ := utils.ReadBenefitFile()
if fileBenefitAcc != nil && len(fileBenefitAcc) > 0 {
nm.HistoryBenefitAcc = fileBenefitAcc
}
isExist := false
for _, acc := range nm.HistoryBenefitAcc {
if acc == rewardAddr {
isExist = true
}
}
if !isExist {
nm.HistoryBenefitAcc = append(nm.HistoryBenefitAcc, rewardAddr)
err := utils.WriteBenefitFile(nm.HistoryBenefitAcc)
if err != nil {
log.Error("WriteBenefitFile failed with error:", err)
return
}
}
conf.GetConfig().SetExternalIp(externalIp)
log.Info("Enter reward address:", rewardAddr)
log.InitLog(log.LogConfig{Path: "logs", Level: "debug", Save: 3})
go nm.StartMonitor()
if debug {
log.InitLog(log.LogConfig{Path: "logs", Level: "debug", Save: 3})
} else {
log.InitLog(log.LogConfig{Path: "logs", Level: "info", Save: 3})
}
//go nm.StartMonitor()
beego.Run()
},
}
......
appname = node-server
httpport = 9090
httpport = 9091
runmode = dev
autorender = false
copyrequestbody = true
\ No newline at end of file
......@@ -11,7 +11,6 @@ import (
)
type Config struct {
SignPrv string
SignPub string
DockerServer string
BenefitAddress string
......@@ -30,6 +29,7 @@ type Config struct {
OpSys string `json:"op_sys" mapstructure:"op_sys"`
ReplicateImageNameSuffix string `json:"replicate_image_name_suffix" mapstructure:"replicate_image_name_suffix"`
IsStopLastContainer bool `json:"is_stop_last_container" mapstructure:"is_stop_last_container"`
DiskUsage int64 `json:"disk_usage" mapstructure:"disk_usage"`
}
var _cfg *Config = nil
......@@ -51,7 +51,7 @@ func (c *Config) GetExternalIp() string {
return c.ExternalIp
}
func (c *Config) SetRewardAddress(addr string) bool {
func (c *Config) SetBenefitAddress(addr string) bool {
isAddr := common.IsHexAddress(addr)
if isAddr {
c.BenefitAddress = addr
......
......@@ -10,5 +10,6 @@
"wait_last_task_exec_time": 60,
"op_sys": "linux",
"replicate_image_name_suffix": "docker.aigic.ai/ai",
"is_stop_last_container": true
"is_stop_last_container": true,
"disk_usage":80
}
\ No newline at end of file
......@@ -6,6 +6,7 @@ import (
"example.com/m/models"
"example.com/m/nm"
"example.com/m/operate"
"example.com/m/utils"
nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"io"
)
......@@ -34,22 +35,41 @@ func (c *NodeController) SetNmSeed() {
c.ResponseInfo(200, "set seed successful", "")
}
func (c *NodeController) SetRewardAddress() {
func (c *NodeController) SetBenefitAddress() {
bodyReq, err := io.ReadAll(c.Ctx.Request.Body)
if err != nil || bodyReq == nil {
c.ResponseInfo(500, "param error", "")
return
}
req := &models.RewardAddress{}
req := &models.BenefitAddress{}
err = json.Unmarshal(bodyReq, req)
if err != nil {
c.ResponseInfo(500, "param error", "")
return
}
if !conf.GetConfig().SetRewardAddress(req.Address) {
if !conf.GetConfig().SetBenefitAddress(req.Address) {
c.ResponseInfo(500, "param is not address", "")
return
}
isExist := false
for _, s := range nm.HistoryBenefitAcc {
if s == req.Address {
isExist = true
}
}
c.ResponseInfo(200, "sign successful", "")
if !isExist {
nm.HistoryBenefitAcc = append(nm.HistoryBenefitAcc, req.Address)
err = utils.WriteBenefitFile(nm.HistoryBenefitAcc)
if err != nil {
c.ResponseInfo(500, "Write benefit file failed", "")
}
}
c.ResponseInfo(200, "set benefit address successful", "")
}
func (c *NodeController) ListHistoryBenefitAddress() {
fileBenefitAcc, _ := utils.ReadBenefitFile()
c.ResponseInfo(200, "list history benefit address successful", fileBenefitAcc)
}
func (c *NodeController) AddNodeManager() {
......@@ -69,7 +89,7 @@ func (c *NodeController) AddNodeManager() {
Endpoint: req.EndPoint,
}
nm.AddNodeManager(nodeManager)
c.ResponseInfo(200, "sign successful", "")
c.ResponseInfo(200, "add node manager successful", "")
}
......@@ -86,3 +106,27 @@ func (c *NodeController) GetNodeManagers() {
}
c.ResponseInfo(200, "Get used node manager successful", res)
}
func (c *NodeController) UpdateRecvStatus() {
bodyReq, err := io.ReadAll(c.Ctx.Request.Body)
if err != nil || bodyReq == nil {
c.ResponseInfo(500, "param error", "")
return
}
req := &models.RecvTask{}
err = json.Unmarshal(bodyReq, req)
if err != nil {
c.ResponseInfo(500, "param error", "")
return
}
nm.IsRecvTask = req.IsRecv
c.ResponseInfo(200, "update recv status successful", "")
}
func (c *NodeController) GetRecvStatus() {
c.ResponseInfo(200, "get recv status successful", nm.IsRecvTask)
}
func (c *NodeController) GetConfigInfo() {
c.ResponseInfo(200, "get config successful", conf.GetConfig())
}
package controllers
import (
"encoding/json"
"example.com/m/conf"
"example.com/m/models"
"example.com/m/nm"
"example.com/m/utils"
"io"
)
type StateController struct {
BaseController
}
func (c *StateController) GetRunningState() {
res := nm.RunningState
c.ResponseInfo(200, "get running state successful", res)
}
func (c *StateController) GetWorkerInfo() {
res := models.WorkerAccount{
WorkerAcc: conf.GetConfig().SignPublicAddress.Hex(),
ChainID: conf.GetConfig().ChainID,
}
c.ResponseInfo(200, "get worker info successful", res)
}
func (c *StateController) GetListGpuInfo() {
info := utils.GetHardwareInfo()
if info != nil && info.Data != nil {
c.ResponseInfo(200, "get list gpu info successful", info.Data.Gpus)
return
}
c.ResponseInfo(500, "get list gpu info failed", nil)
}
func (c *StateController) GetGpuUsageInfo() {
bodyReq, err := io.ReadAll(c.Ctx.Request.Body)
if err != nil || bodyReq == nil {
c.ResponseInfo(500, "param error", "")
return
}
req := &models.GpuUsageReq{}
err = json.Unmarshal(bodyReq, req)
if err != nil {
c.ResponseInfo(500, "param error", "")
return
}
info := utils.GetHardwareInfo()
if info != nil {
for _, gpu := range info.Data.Gpus {
if gpu.Seq == req.Seq {
c.ResponseInfo(200, "set seed successful", gpu)
return
}
}
}
c.ResponseInfo(500, "get gpu usage info failed", nil)
}
func (c *StateController) GetOtherHardwareInfo() {
info := utils.GetHardwareInfo()
res := &models.OtherHardwareInfoResp{
CpuTemp: info.Data.Cpus.Usage,
RamUsage: info.Data.Mem.Total,
DiskUsage: info.Data.Disk[0].Total,
}
c.ResponseInfo(200, "get hardware info successful", res)
}
......@@ -7,6 +7,7 @@ require (
github.com/docker/docker v24.0.7+incompatible
github.com/docker/go-connections v0.5.0
github.com/ethereum/go-ethereum v1.13.11
github.com/fsnotify/fsnotify v1.7.0
github.com/go-cmd/cmd v1.4.2
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible
......@@ -36,7 +37,6 @@ require (
github.com/docker/distribution v2.8.3+incompatible // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/ethereum/c-kzg-4844 v0.4.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-ole/go-ole v1.3.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
......
......@@ -10,6 +10,7 @@ import (
"io"
"net/http"
"os"
"strconv"
"strings"
"time"
)
......@@ -18,6 +19,8 @@ type ModelHandler struct {
dockerOp *operate.DockerOp
client *http.Client
modelsFileName string
hotModel map[string]bool
popularModel map[string]bool
}
func NewModelHandler(dockerOp *operate.DockerOp) *ModelHandler {
......@@ -75,7 +78,7 @@ func (m *ModelHandler) MonitorModelInfo() {
}
if !imageNameMap[modelInfo.ImageName] {
// todo: 判断机器资源是否够用
isPull := isResourceEnough(modelInfo)
isPull := m.isResourceEnough(modelInfo)
// todo: 如果够用
if isPull && modelInfo.PublishStatus == models.ModelPublishStatusYes {
log.WithField("model image name", modelInfo.ImageName).Info("pulling image")
......@@ -99,6 +102,9 @@ func (m *ModelHandler) MonitorModelInfo() {
}
}
func (m *ModelHandler) heatDataHandler(modelInfosResp []*models.ModelInfo) {
}
func (m *ModelHandler) ReadModels() ([]*models.ModelInfo, error) {
bodyBytes, err := os.ReadFile(m.modelsFileName)
if err != nil {
......@@ -122,6 +128,39 @@ func (m *ModelHandler) ReadModels() ([]*models.ModelInfo, error) {
return resp.Data, nil
}
func isResourceEnough(modelInfo *models.ModelInfo) bool {
func (m *ModelHandler) isResourceEnough(modelInfo *models.ModelInfo) bool {
//isDownload := m.checkDiskUsage(modelInfo)
//isDownload = true
//if !isDownload {
// return isDownload
//}
return true
}
func (m *ModelHandler) checkDiskUsage(modelInfo *models.ModelInfo) bool {
totalSize, usedSize, availSize, usageSize, err := m.dockerOp.GetDockerInfo()
if err != nil {
log.WithError(err).Error("Disk resource is not enough")
return false
}
log.WithField("TotalSize:", totalSize).WithField("UsedSize:", usedSize).WithField("AvailSize:", availSize).Info("The disk info")
if conf.GetConfig().DiskUsage < usageSize {
return false
}
modelDiskSizeGB, err := strconv.ParseInt(modelInfo.HardwareRequire.DiskSize, 10, 64)
if err != nil {
log.Errorf("Error parsing model disk usage failed: %v", err)
return false
}
modelDiskSizeByte := modelDiskSizeGB * 1024 * 1024
if availSize < modelDiskSizeByte {
log.Error("The hard drive is running out of space")
return false
}
return true
}
func (m *ModelHandler) checkHeat() bool {
return false
}
package models
const (
TaskType = "taskType"
ContainerSign = "container"
MinerSign = "miner"
ReqHash = "reqHash"
RespHash = "respHash"
ResultFileExpiresDB = "expires"
ContentType = "type"
RedirectCode = 303
UseFileCache = "Use-File-Cache"
UseRedirect = "Use-Redirect"
Prefer = "Prefer"
Async = "respond-async"
MaxExecTime = "MaxExecTime"
HealthCheckAPI = "/health-check"
READY = "READY"
ZeroHost = "0.0.0.0"
ModelPublishStatusYes = 1
ModelPublishStatusNo = 2
DefaultMaxExecTime = 300
DefaultTaskTimer = 2
TaskType = "taskType"
ContainerSign = "container"
MinerSign = "miner"
ReqHash = "reqHash"
RespHash = "respHash"
ResultFileExpiresDB = "expires"
ContentType = "type"
RedirectCode = 303
UseFileCache = "Use-File-Cache"
UseRedirect = "Use-Redirect"
Prefer = "Prefer"
Async = "respond-async"
MaxExecTime = "MaxExecTime"
HealthCheckAPI = "/health-check"
READY = "READY"
ZeroHost = "0.0.0.0"
ModelPublishStatusYes = 1
ModelPublishStatusNo = 2
DefaultMaxExecTime = 300
DefaultTaskTimer = 2
EncryptionKey = "uxhendjFYsoWFnsO"
HistoryBenefitAddressDirectory = "data/benefitList"
)
......@@ -3,3 +3,57 @@ package models
type BarkOutPut struct {
AudioOut string `json:"audio_out"`
}
type SwinirOutPut struct {
File string `json:"file"`
}
type WhisperOutPut struct {
Segments []struct {
Id string `json:"id"`
End float64 `json:"end"`
Seek int `json:"seek"`
Text string `json:"text"`
Start int `json:"start"`
Tokens []int `json:"tokens"`
AvgLogprob float64 `json:"avg_log_prob"`
Temperature int `json:"temperature"`
NoSpeechProb float64 `json:"no_speech_prob"`
CompressionRatio float64 `json:"compression_ratio"`
} `json:"segments"`
}
type WhisperDiarizationOutPut struct {
Language string `json:"language"`
Segments []struct {
End string `json:"end"`
Text string `json:"text"`
Start string `json:"start"`
Words []struct {
End float64 `json:"end"`
Word string `json:"word"`
Start float64 `json:"start"`
Probability float64 `json:"probability"`
} `json:"words"`
Speaker string `json:"speaker"`
AvgLogprob float64 `json:"avg_logprob"`
} `json:"segments"`
NumSpeakers int `json:"num_speakers"`
}
type IncrediblyFastWhisper struct {
Text string `json:"text"`
Chunks []struct {
Text string `json:"text"`
Timestamp []float64 `json:"timestamp"`
} `json:"chunks"`
}
type ClipFeatures struct {
Input string `json:"input"`
Embedding []float64 `json:"embedding"`
}
type AllMpnetBaseV2 struct {
Embedding []float64 `json:"embedding"`
}
package models
import (
nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
nodemanagerV2 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
"sync"
"time"
)
......@@ -62,6 +62,69 @@ type GpuInfo struct {
Gpu string `json:"gpu"`
}
type Gpu struct {
Seq int64 `json:"seq"`
Uuid string `json:"uuid"`
Model string `json:"model "`
Performance int64 `json:"performance"`
PowerRating int64 `json:"power_rating"`
MemTotal int64 `json:"mem_total"`
MemFree int64 `json:"mem_free"`
Usage int64 `json:"usage"`
Temp int64 `json:"temp "`
PowerRt int64 `json:"power_rt"`
}
type Cpu struct {
Model string `json:"model"`
Number int32 `json:"number"`
Cores int32 `json:"cores"`
Threads int32 `json:"threads"`
Usage int32 `json:"usage"`
Frequency string `json:"frequency"`
}
type CoreCpuInfo struct {
Seq int64 `json:"seq"`
Model string `json:"model"`
Thread int64 `json:"thread"`
Core int64 `json:"core"`
}
type Mem struct {
Total int64 `json:"total"`
Free int64 `json:"free"`
}
type Disk struct {
Device string `json:"device"`
MountPoints []string `json:"mount_points"`
FreeBytes int64 `json:"free_bytes"`
SizeBytes int64 `json:"size_bytes"`
}
type Net struct {
Device string `json:"device"`
Speed int32 `json:"speed"`
SendRate int32 `json:"send_rate"`
RecvRate int32 `json:"recv_rate"`
Mac string `json:"mac"`
}
type HardwareInfo struct {
Gpus []*Gpu `json:"GPU"`
Cpus *Cpu `json:"CPU"`
Mem Mem `json:"RAM"`
Disk []*Disk `json:"DISK"`
Networks []*Net `json:"NET"`
}
type HardwareInfoRep struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data *HardwareInfo `json:"hardware"`
}
type ModelInfo struct {
Time time.Time `json:"time"`
Count int64 `json:"count"`
......@@ -77,6 +140,7 @@ type ModelInfo struct {
PublishStatus int `json:"publish_status"`
EstimatExeTime int64 `json:"estimat_exe_time"`
StartUpTime int64 `json:"start_up_time"`
RunningMem int64 `json:"running_mem"`
}
type HealthyCheck struct {
......@@ -105,7 +169,7 @@ type NodeManagerClient struct {
LastHeartTime time.Time
PublicKey string
Endpoint string
Client nodeManagerV1.NodeManagerServiceClient
Client nodemanagerV2.NodeManagerServiceClient
Status bool
IsDel bool
IsSelected bool
......
package models
type RewardAddress struct {
type BenefitAddress struct {
Address string `json:"address"`
}
type RecvTask struct {
IsRecv bool `json:"is_recv"`
}
type SeedUrl struct {
Seed string `json:"seed"`
}
......@@ -13,6 +17,43 @@ type NodeManagerReq struct {
EndPoint string `json:"end_point"`
}
type RunningState struct {
RunningTime int64 `json:"running_time"`
CompletedTaskCount int `json:"completed_task_count"`
NmIpAddr string `json:"nm_ip_addr"`
NmDelayTime int64 `json:"nm_delay_time"`
}
type WorkerAccount struct {
WorkerAcc string `json:"worker_acc"`
ChainID int64 `json:"chain_id"`
}
type GpuInfoResp struct {
Seq int `json:"seq"`
Name string `json:"name"`
TotalMem int64 `json:"total_mem"`
UtilMem int64 `json:"util_mem"`
FreeMem int64 `json:"free_mem"`
}
type GpuUsageReq struct {
Seq int64 `json:"seq"`
}
type GpuUsageInfoResp struct {
Seq int `json:"seq"`
Occupy int `json:"occupy"`
Usage int64 `json:"usage"`
Temp int `json:"temp"`
}
type OtherHardwareInfoResp struct {
CpuTemp int64 `json:"cpu_temp"`
RamUsage int64 `json:"ram_usage"`
DiskUsage int64 `json:"disk_usage"`
}
type Resp struct {
Code int `json:"code"`
Msg string `json:"msg"`
......
package nm
import (
"example.com/m/log"
"example.com/m/models"
"example.com/m/operate"
nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
nodemanagerV2 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
"time"
)
type NodeManager struct {
Info *nodeManagerV1.NodeManagerInfo `json:"info,omitempty"`
Info *nodemanagerV2.NodeManagerInfo `json:"info,omitempty"`
IsUsed bool `json:"isUsed,omitempty"`
IsExist bool `json:"isExist,omitempty"`
}
var (
HistoryBenefitAcc []string
RunningState *models.RunningState
IsRecvTask bool
)
func init() {
IsRecvTask = true
HistoryBenefitAcc = make([]string, 0)
RunningState = &models.RunningState{
RunningTime: time.Now().Unix(),
CompletedTaskCount: 0,
NmIpAddr: "",
NmDelayTime: 0,
}
}
func GetNodeManagers() []*NodeManager {
return nodeManagerArr
}
func AddNodeManager(node *nodeManagerV1.NodeManagerInfo) {
func AddNodeManager(node *nodemanagerV2.NodeManagerInfo) {
nodeManager := &NodeManager{
Info: node,
IsUsed: false,
......@@ -27,7 +42,7 @@ func AddNodeManager(node *nodeManagerV1.NodeManagerInfo) {
nodeManagerArr = append(nodeManagerArr, nodeManager)
}
func DelNodeManager(node *nodeManagerV1.NodeManagerInfo) {
func DelNodeManager(node *nodemanagerV2.NodeManagerInfo) {
for _, manager := range nodeManagerArr {
if manager.Info.Endpoint == node.Endpoint {
manager.IsExist = false
......@@ -45,7 +60,7 @@ func getUnUsedNodeManagers() []*NodeManager {
return res
}
func isExistNodeManager(nodeManager *nodeManagerV1.NodeManagerInfo) bool {
func isExistNodeManager(nodeManager *nodemanagerV2.NodeManagerInfo) bool {
for _, manager := range nodeManagerArr {
if nodeManager.Endpoint == manager.Info.Endpoint {
//manager.Info.Publickey = nodeManager.Publickey
......@@ -63,26 +78,3 @@ func getNodeManager(endPoint string) *NodeManager {
}
return nil
}
func inputNodeManagerChan(manager *NodeManager, nodeManagerClient *models.NodeManagerClient, isSelect bool, monitorNm *MonitorNm) bool {
if nodeManagerClient == nil {
nodeManagerClient = &models.NodeManagerClient{
PublicKey: manager.Info.Publickey,
Endpoint: manager.Info.Endpoint,
Status: true,
IsSelected: isSelect,
LastHeartTime: time.Now(),
}
usedNodeManagerClient = append(usedNodeManagerClient, nodeManagerClient)
}
serviceClient := operate.ConnNmGrpc(manager.Info.Endpoint)
if serviceClient == nil {
log.WithField("endPoint", manager.Info.Endpoint).Error("Connect node manager failed")
return false
}
nodeManagerClient.Status = true
nodeManagerClient.Client = serviceClient
monitorNm.NodeManagerClientChan <- nodeManagerClient
manager.IsUsed = true
return true
}
......@@ -6,16 +6,15 @@ import (
"example.com/m/log"
"example.com/m/models"
"example.com/m/operate"
"example.com/m/utils"
"example.com/m/validator"
nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
nodemanagerV2 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
"google.golang.org/grpc"
"time"
)
type MonitorNm struct {
NodeManagerClientChan chan *models.NodeManagerClient
NodeManagerMsgChan chan *nodeManagerV1.ManagerMessage
NodeManagerMsgChan chan *nodemanagerV2.ManagerMessage
DockerOp *operate.DockerOp
IsInit bool
}
......@@ -23,7 +22,7 @@ type MonitorNm struct {
func NewMonitorNm(DockerOp *operate.DockerOp) *MonitorNm {
return &MonitorNm{
NodeManagerClientChan: make(chan *models.NodeManagerClient, 10),
NodeManagerMsgChan: make(chan *nodeManagerV1.ManagerMessage, 1000),
NodeManagerMsgChan: make(chan *nodemanagerV2.ManagerMessage, 1000),
DockerOp: DockerOp,
IsInit: false,
}
......@@ -58,8 +57,8 @@ func (m *MonitorNm) monitorNmClient() {
log.Info("------------------------Send deviceInfo message ended------------------------")
if len(m.DockerOp.ReportModelIds) == 0 {
params := utils.BuildParams(m.DockerOp.ReportModelIds, []uint64{0})
msgRespWorker.RegisterMsgResp(nodeManager, worker, SubmitResourceMapRes, params)
//params := utils.BuildParams(m.DockerOp.ReportModelIds, []uint64{0})
//msgRespWorker.RegisterMsgResp(nodeManager, worker, SubmitResourceMapRes, params)
}
log.Info("------------------------Send once-off message ended------------------------")
......@@ -121,7 +120,7 @@ func (m *MonitorNm) monitorNodeManagerSeed() {
log.Warn("Connect nm seed service client is nil")
continue
}
list, err := seedServiceClient.ManagerList(context.Background(), &nodeManagerV1.ManagerListRequest{}, grpc.EmptyCallOption{})
list, err := seedServiceClient.ManagerList(context.Background(), &nodemanagerV2.ManagerListRequest{}, grpc.EmptyCallOption{})
if err != nil {
log.WithError(err).Warn("Get manager list failed through nm seed service")
continue
......
......@@ -8,18 +8,18 @@ import (
"example.com/m/utils"
"example.com/m/validator"
"fmt"
nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
nodemanagerV2 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
"time"
)
type NodeManagerHandler struct {
nodeManager *models.NodeManagerClient
worker nodeManagerV1.NodeManagerService_RegisterWorkerClient
worker nodemanagerV2.NodeManagerService_RegisterWorkerClient
msgRespWorker *RespMsgWorker
taskMsgWorker *TaskWorker
}
func NewNodeManagerHandler(nodeManager *models.NodeManagerClient, worker nodeManagerV1.NodeManagerService_RegisterWorkerClient, msgRespWorker *RespMsgWorker, taskMsgWorker *TaskWorker) *NodeManagerHandler {
func NewNodeManagerHandler(nodeManager *models.NodeManagerClient, worker nodemanagerV2.NodeManagerService_RegisterWorkerClient, msgRespWorker *RespMsgWorker, taskMsgWorker *TaskWorker) *NodeManagerHandler {
return &NodeManagerHandler{
nodeManager: nodeManager,
worker: worker,
......@@ -28,7 +28,7 @@ func NewNodeManagerHandler(nodeManager *models.NodeManagerClient, worker nodeMan
}
}
func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *nodeManagerV1.ManagerMessage, proofWorker *validator.ProofWorker) {
func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *nodemanagerV2.ManagerMessage, proofWorker *validator.ProofWorker) {
for {
select {
case rev := <-nodeManagerMsgChan:
......@@ -39,6 +39,9 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node
}
heartbeatReq := rev.GetHeartbeatRequest()
if heartbeatReq != nil {
unix := time.Unix(int64(heartbeatReq.Timestamp), 0)
since := time.Since(unix)
RunningState.NmDelayTime = since.Milliseconds()
n.nodeManager.UpdateLastHeartTime(time.Now())
params := utils.BuildParams(heartbeatReq.Timestamp)
n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, HeartbeatResp, params)
......@@ -46,9 +49,9 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node
continue
}
taskMsg := rev.GetPushTaskMessage()
taskMsg := rev.GetPushTask()
if taskMsg != nil {
go func(msgRespWorker *RespMsgWorker, taskMsgWorker *TaskWorker, taskMsg *nodeManagerV1.PushTaskMessage) {
go func(msgRespWorker *RespMsgWorker, taskMsgWorker *TaskWorker, taskMsg *nodemanagerV2.PushTaskMessage) {
isCanExecute, bootUpTime, queueWaitTime, executeTime := taskMsgWorker.GetAckResp(taskMsg)
ackParams := utils.BuildParams(taskMsg.TaskId, isCanExecute, bootUpTime, queueWaitTime, executeTime)
msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, RespTaskAck, ackParams)
......@@ -93,36 +96,14 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.ReqHash, reqHash)
taskMsgWorker.LruCache.Add(taskMsg.TaskId+models.RespHash, respHash)
msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, SubmitResultResp, taskResultParams)
RunningState.CompletedTaskCount++
log.Info("Completed task count: ", RunningState.CompletedTaskCount)
log.Info("--------------taskMsg--------------:", taskMsg)
}(n.msgRespWorker, n.taskMsgWorker, taskMsg)
continue
}
nmResultMsg := rev.GetProofTaskResult()
if nmResultMsg != nil {
//containerSign, _ := taskMsgWorker.LruCache.Get(nmResultMsg.TaskId + models.ContainerSign)
//if !ok {
// log.Error("taskMsgWorker.LruCache.Get failed: ", nmSignMsg.TaskUuid+models.ContainerSign)
//}
//minerSign, _ := taskMsgWorker.LruCache.Get(nmResultMsg.TaskId + models.MinerSign)
//if !ok {
// log.Error("taskMsgWorker.LruCache.Get failed: ", nmSignMsg.TaskUuid+models.MinerSign)
//}
//reqHash, _ := taskMsgWorker.LruCache.Get(nmResultMsg.TaskId + models.ReqHash)
//if !ok {
// log.Error("taskMsgWorker.LruCache.Get failed: ", nmSignMsg.TaskUuid+models.ReqHash)
//}
//respHash, _ := taskMsgWorker.LruCache.Get(nmResultMsg.TaskId + models.RespHash)
//if !ok {
// log.Error("taskMsgWorker.LruCache.Get failed: ", nmSignMsg.TaskUuid+models.RespHash)
//}
//taskType, _ := taskMsgWorker.LruCache.Get(nmResultMsg.TaskId + models.TaskType)
//proofWorker.ProductProof(nmResultMsg, taskType.(uint64), reqHash.([]byte), respHash.([]byte), containerSign.([]byte), minerSign.([]byte))
log.WithField("proof", nmResultMsg).Info("Output proof task result")
continue
}
deviceUsageMsg := rev.GetDeviceUsage()
deviceUsageMsg := rev.GetDeviceUsageRequest()
if deviceUsageMsg != nil {
n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, DeviceUsageResp, nil)
log.Info(deviceUsageMsg)
......@@ -136,13 +117,25 @@ func (n *NodeManagerHandler) DistributionMsgWorker(nodeManagerMsgChan chan *node
continue
}
statusReqMsg := rev.GetStatusRequest()
if statusReqMsg != nil {
n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, StatusResp, nil)
log.Info(statusReqMsg)
gpuUsageMsg := rev.GetGpuUsageRequest()
if gpuUsageMsg != nil {
n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, GpuUsageResp, nil)
log.Info(gpuUsageMsg)
continue
}
modelListMsg := rev.GetModelListRequest()
if modelListMsg != nil {
n.msgRespWorker.RegisterMsgResp(n.nodeManager, n.worker, ModelListResp, nil)
log.Info(modelListMsg)
continue
}
modelOpMsg := rev.GetModelOperateRequest()
if modelOpMsg != nil {
//for _, modelOperate := range modelOpMsg.ModelOperates {
//}
continue
}
goodByeMsg := rev.GetGoodbyeMessage()
if goodByeMsg != nil {
reason := goodByeMsg.GetReason()
......
......@@ -5,22 +5,20 @@ import (
"example.com/m/conf"
"example.com/m/log"
"example.com/m/models"
"fmt"
"example.com/m/utils"
"github.com/docker/docker/libnetwork/bitmap"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
nodemanagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"github.com/shirou/gopsutil/cpu"
nodemanagerV2 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
"math/big"
"strconv"
"time"
)
type WorkerMsgHandler func(params ...interface{}) *nodemanagerV1.WorkerMessage
type WorkerMsgHandler func(params ...interface{}) *nodemanagerV2.WorkerMessage
type RespMsgHandler struct {
nodeManager *models.NodeManagerClient
workerClient nodemanagerV1.NodeManagerService_RegisterWorkerClient
workerClient nodemanagerV2.NodeManagerService_RegisterWorkerClient
handler WorkerMsgHandler
params []interface{}
}
......@@ -35,7 +33,7 @@ func NewMsgRespWorker() *RespMsgWorker {
}
}
func (o *RespMsgWorker) RegisterMsgResp(nodeManager *models.NodeManagerClient, workerClient nodemanagerV1.NodeManagerService_RegisterWorkerClient, handler WorkerMsgHandler, params []interface{}) {
func (o *RespMsgWorker) RegisterMsgResp(nodeManager *models.NodeManagerClient, workerClient nodemanagerV2.NodeManagerService_RegisterWorkerClient, handler WorkerMsgHandler, params []interface{}) {
o.MsgPool <- &RespMsgHandler{
nodeManager: nodeManager,
workerClient: workerClient,
......@@ -66,12 +64,12 @@ func (o *RespMsgWorker) SendMsgWorker() {
}
}
func HeartbeatResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
func HeartbeatResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("Heartbeat response received params: ", params)
serverTimestamp := params[0].(uint64)
heartRes := &nodemanagerV1.WorkerMessage{
Message: &nodemanagerV1.WorkerMessage_HeartbeatResponse{
HeartbeatResponse: &nodemanagerV1.HeartbeatResponse{
heartRes := &nodemanagerV2.WorkerMessage{
Message: &nodemanagerV2.WorkerMessage_HeartbeatResponse{
HeartbeatResponse: &nodemanagerV2.HeartbeatResponse{
Timestamp: serverTimestamp,
},
},
......@@ -80,9 +78,8 @@ func HeartbeatResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
return heartRes
}
func SubmitResourceMapRes(params ...interface{}) *nodemanagerV1.WorkerMessage {
func SubmitResourceMapRes(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("Submit resource map response received params: ", params)
existModelIdIndexes := params[0].([]uint64)
existMap := bitmap.New(1000000000)
for i := 0; i < len(existModelIdIndexes); i++ {
......@@ -98,7 +95,6 @@ func SubmitResourceMapRes(params ...interface{}) *nodemanagerV1.WorkerMessage {
log.Error("bitmap marshal binary failed with error: ", err)
return nil
}
bootUpModelIdIndexes := params[1].([]uint64)
bootUpMap := bitmap.New(1000000000)
for i := 0; i < len(bootUpModelIdIndexes); i++ {
......@@ -109,26 +105,19 @@ func SubmitResourceMapRes(params ...interface{}) *nodemanagerV1.WorkerMessage {
return nil
}
}
bootUpImage, err := bootUpMap.MarshalBinary()
_, err = bootUpMap.MarshalBinary()
if err != nil {
log.Error("bitmap marshal binary failed with error: ", err)
return nil
}
log.WithField("", existImage).Info("Bit map binary byte")
heartRes := &nodemanagerV1.WorkerMessage{
Message: &nodemanagerV1.WorkerMessage_ResourceMap{
ResourceMap: &nodemanagerV1.SubmitResourceMap{
ResourceMap: existImage,
BootupMap: bootUpImage,
},
},
}
heartRes := &nodemanagerV2.WorkerMessage{}
log.Info("---------------------------------------Send resource map msg ------------------------------------")
return heartRes
}
func RegisterInfoResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
func RegisterInfoResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("Register info response received params:", params)
nowTimeStamp := time.Now().Unix()
nowTimeBytes := big.NewInt(nowTimeStamp).Bytes()
......@@ -139,12 +128,17 @@ func RegisterInfoResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
log.WithField("hash", signHash.String()).Info("register message sign result")
sign, _ := crypto.Sign(signHash.Bytes(), conf.GetConfig().SignPrivateKey)
log.Info("register message sign:", common.Bytes2Hex(sign))
nodeInfoRes := &nodemanagerV1.WorkerMessage{
Message: &nodemanagerV1.WorkerMessage_RegisteMessage{
RegisteMessage: &nodemanagerV1.RegisteMessage{
DeviceIp: conf.GetConfig().GetExternalIp(),
MinerPubkey: conf.GetConfig().SignPub,
BenefitAddress: conf.GetConfig().BenefitAddress,
modelsInfo := getModelsInfo()
hardwareInfo := getHardwareInfo()
nodeInfoRes := &nodemanagerV2.WorkerMessage{
Message: &nodemanagerV2.WorkerMessage_RegisteMessage{
RegisteMessage: &nodemanagerV2.RegisteMessage{
Info: &nodemanagerV2.NodeInfo{
MinerPubkey: conf.GetConfig().SignPub,
BenefitAddress: conf.GetConfig().BenefitAddress,
},
Hardware: hardwareInfo,
Models: modelsInfo,
Timestamp: nowTimeStamp,
DeviceSignature: sign,
},
......@@ -154,14 +148,19 @@ func RegisterInfoResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
return nodeInfoRes
}
func NodeInfoResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
func NodeInfoResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("Node info response received params:", params)
nodeInfoRes := &nodemanagerV1.WorkerMessage{
Message: &nodemanagerV1.WorkerMessage_NodeInfo{
NodeInfo: &nodemanagerV1.NodeInfoResponse{
MinerPubkey: conf.GetConfig().SignPub,
BenefitAddress: conf.GetConfig().BenefitAddress,
DeviceIp: conf.GetConfig().GetExternalIp(),
hardwareInfo := getHardwareInfo()
modelsInfo := getModelsInfo()
nodeInfoRes := &nodemanagerV2.WorkerMessage{
Message: &nodemanagerV2.WorkerMessage_NodeInfo{
NodeInfo: &nodemanagerV2.NodeInfoResponse{
Info: &nodemanagerV2.NodeInfo{
MinerPubkey: conf.GetConfig().SignPub,
BenefitAddress: conf.GetConfig().BenefitAddress,
},
Hardware: hardwareInfo,
Models: modelsInfo,
},
},
}
......@@ -169,50 +168,13 @@ func NodeInfoResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
return nodeInfoRes
}
func DeviceInfoResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
func DeviceInfoResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("Device info response received params:", params)
devices := make([]*nodemanagerV1.DeviceInfo, 0)
cpuInfos, err := cpu.Info()
if err != nil {
log.Error("Error getting CPU info: ", err)
}
for i, cpuInfo := range cpuInfos {
cpuInfo := &nodemanagerV1.DeviceInfo{
DeviceType: fmt.Sprintf("cpu-%d", i),
DeviceModel: cpuInfo.ModelName,
DevicePower: 12,
DeviceParam: strconv.FormatFloat(cpuInfo.Mhz, 'f', 2, 64),
}
devices = append(devices, cpuInfo)
}
cpuInfo := &nodemanagerV1.DeviceInfo{
DeviceType: "cpu-0",
DeviceModel: "xl",
DevicePower: 12,
DeviceParam: "2150",
}
devices = append(devices, cpuInfo)
gpuInfo := &nodemanagerV1.DeviceInfo{
DeviceType: fmt.Sprint("gpu-0"),
DeviceModel: "Nvidia",
DevicePower: 12,
DeviceParam: "1200",
}
devices = append(devices, gpuInfo)
memInfo := &nodemanagerV1.DeviceInfo{
DeviceType: fmt.Sprint("mem-0"),
DeviceModel: "Micron",
DevicePower: 12,
DeviceParam: "1200",
}
devices = append(devices, memInfo)
deviceInfoRes := &nodemanagerV1.WorkerMessage{
Message: &nodemanagerV1.WorkerMessage_DeviceInfo{
DeviceInfo: &nodemanagerV1.DeviceInfoMessage{
Devices: devices,
hardwareInfo := getHardwareInfo()
deviceInfoRes := &nodemanagerV2.WorkerMessage{
Message: &nodemanagerV2.WorkerMessage_DeviceInfo{
DeviceInfo: &nodemanagerV2.DeviceInfoMessage{
Hardware: hardwareInfo,
DeviceSignature: []byte(""),
},
},
......@@ -221,38 +183,72 @@ func DeviceInfoResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
return deviceInfoRes
}
func DeviceUsageResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
func DeviceUsageResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("DeviceUsageResp params :", params)
deviceInfoRes := &nodemanagerV1.WorkerMessage{
Message: &nodemanagerV1.WorkerMessage_DeviceUsage{
DeviceUsage: &nodemanagerV1.DeviceUsageResponse{},
info := getHardwareInfo()
ramUsage := int32((1 - info.RAM.Total/info.RAM.Free) * 100)
diskUsage := int32((1 - info.DISK.Total/info.DISK.Free) * 100)
deviceInfoRes := &nodemanagerV2.WorkerMessage{
Message: &nodemanagerV2.WorkerMessage_DeviceUsage{
DeviceUsage: &nodemanagerV2.DeviceUsageResponse{
Usage: &nodemanagerV2.HardwareUsage{
CpuUsage: info.CPU.Usage,
RamUsage: ramUsage,
DiskUsage: diskUsage,
NetBandwidth: info.NET.Bandwidth,
},
},
},
}
log.Info("---------------------------------------Send device usage msg ------------------------------------")
return deviceInfoRes
}
func StatusResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
log.Info("Status resp received params:", params)
statusRes := &nodemanagerV1.WorkerMessage{
Message: &nodemanagerV1.WorkerMessage_Status{
Status: &nodemanagerV1.StatusResponse{
DeviceStatus: []byte("0"),
func GpuUsageResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("DeviceUsageResp params :", params)
info := getHardwareInfo()
gpusUsage := make([]*nodemanagerV2.GPUUsage, 0)
for _, gpuInfo := range info.GPU {
usage := &nodemanagerV2.GPUUsage{
Seq: gpuInfo.Seq,
MemFree: gpuInfo.MemFree,
Usage: gpuInfo.Usage,
Temp: gpuInfo.Temp,
PowerRt: gpuInfo.PowerRt,
}
gpusUsage = append(gpusUsage, usage)
}
deviceInfoRes := &nodemanagerV2.WorkerMessage{
Message: &nodemanagerV2.WorkerMessage_GpuUsage{
GpuUsage: &nodemanagerV2.GPUUsageResponse{
Usages: gpusUsage,
},
},
}
log.Info("---------------------------------------Send device status msg ------------------------------------")
return statusRes
log.Info("---------------------------------------Send gpu usage msg ------------------------------------")
return deviceInfoRes
}
func ModelListResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("DeviceUsageResp params :", params)
info := getModelsInfo()
modelListInfoRes := &nodemanagerV2.WorkerMessage{
Message: &nodemanagerV2.WorkerMessage_ModelsInfo{
ModelsInfo: info,
},
}
log.Info("---------------------------------------Send model list msg ------------------------------------")
return modelListInfoRes
}
func SubmitResultResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
func SubmitResultResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
//log.Info("Handler task submit result resp received params:", params)
taskId := params[0].(string)
containerSign := params[1].([]byte)
minerSign := params[2].([]byte)
taskExecResult := params[3].(*models.TaskResult)
isSuccess := params[4].(bool)
n := &nodemanagerV1.SubmitTaskResult{
n := &nodemanagerV2.SubmitTaskResult{
TaskId: taskId,
ContainerSignature: containerSign,
MinerSignature: minerSign,
......@@ -263,8 +259,8 @@ func SubmitResultResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
IsSuccessed: isSuccess,
TaskResultBody: taskExecResult.TaskRespBody,
}
submitResultMsgRes := &nodemanagerV1.WorkerMessage{
Message: &nodemanagerV1.WorkerMessage_SubmitTaskResult{
submitResultMsgRes := &nodemanagerV2.WorkerMessage{
Message: &nodemanagerV2.WorkerMessage_SubmitTaskResult{
SubmitTaskResult: n,
},
}
......@@ -272,11 +268,11 @@ func SubmitResultResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
return submitResultMsgRes
}
func FetchStandardTaskResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
func FetchStandardTaskResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
//log.Info("Handler task submit result resp received params:", params)
fetchStandardTaskMsgRes := &nodemanagerV1.WorkerMessage{
Message: &nodemanagerV1.WorkerMessage_FetchStandardTask{
FetchStandardTask: &nodemanagerV1.FetchStandardTask{
fetchStandardTaskMsgRes := &nodemanagerV2.WorkerMessage{
Message: &nodemanagerV2.WorkerMessage_FetchStandardTask{
FetchStandardTask: &nodemanagerV2.FetchStandardTask{
TaskType: 998,
},
},
......@@ -285,15 +281,15 @@ func FetchStandardTaskResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
return fetchStandardTaskMsgRes
}
func RespTaskAck(params ...interface{}) *nodemanagerV1.WorkerMessage {
func RespTaskAck(params ...interface{}) *nodemanagerV2.WorkerMessage {
taskId := params[0].(string)
canExecute := params[1].(bool)
bootUpTime := params[2].(int64)
queueWaitTime := params[3].(int64)
executeTime := params[4].(int64)
taskAckMsgRes := &nodemanagerV1.WorkerMessage{
Message: &nodemanagerV1.WorkerMessage_SubmitTaskAck{
SubmitTaskAck: &nodemanagerV1.SubmitTaskAck{
taskAckMsgRes := &nodemanagerV2.WorkerMessage{
Message: &nodemanagerV2.WorkerMessage_SubmitTaskAck{
SubmitTaskAck: &nodemanagerV2.SubmitTaskAck{
TaskId: taskId,
CanExecute: canExecute,
BootUpTime: bootUpTime,
......@@ -306,15 +302,15 @@ func RespTaskAck(params ...interface{}) *nodemanagerV1.WorkerMessage {
return taskAckMsgRes
}
func GoodbyeResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
func GoodbyeResp(params ...interface{}) *nodemanagerV2.WorkerMessage {
log.Info("Goodbye resp received params:", params)
reason := ""
if len(params) > 0 {
reason = params[0].(string)
}
goodbyeMsgRes := &nodemanagerV1.WorkerMessage{
Message: &nodemanagerV1.WorkerMessage_GoodbyeMessage{
GoodbyeMessage: &nodemanagerV1.GoodbyeMessage{
goodbyeMsgRes := &nodemanagerV2.WorkerMessage{
Message: &nodemanagerV2.WorkerMessage_GoodbyeMessage{
GoodbyeMessage: &nodemanagerV2.GoodbyeMessage{
Reason: reason,
},
},
......@@ -322,3 +318,59 @@ func GoodbyeResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
log.Info("---------------------------------------Send good bye msg ------------------------------------")
return goodbyeMsgRes
}
func getModelsInfo() *nodemanagerV2.ModelsInfo {
installModels := make([]*nodemanagerV2.InstalledModel, 0)
runningModels := make([]*nodemanagerV2.RunningModel, 0)
res := &nodemanagerV2.ModelsInfo{
InstalledModels: installModels,
RunningModels: runningModels,
}
return res
}
func getHardwareInfo() *nodemanagerV2.HardwareInfo {
hardwareInfo := utils.GetHardwareInfo()
gpusInfo := make([]*nodemanagerV2.GPUInfo, 0)
var diskTotal, diskFree int64
for _, disk := range hardwareInfo.Data.Disk {
for _, point := range disk.MountPoints {
if point == "/" {
diskTotal += disk.SizeBytes
diskFree += disk.FreeBytes
}
}
}
var macAddr string
var bandWidth int32
for _, net := range hardwareInfo.Data.Networks {
if net.Device == "docker0" {
macAddr = net.Mac
bandWidth = net.Speed
}
}
res := &nodemanagerV2.HardwareInfo{
CPU: &nodemanagerV2.CPUInfo{
Model: hardwareInfo.Data.Cpus.Model,
Number: hardwareInfo.Data.Cpus.Number,
Cores: hardwareInfo.Data.Cpus.Cores,
Threads: hardwareInfo.Data.Cpus.Threads,
Usage: hardwareInfo.Data.Cpus.Usage,
},
GPU: gpusInfo,
RAM: &nodemanagerV2.MemoryInfo{
Total: hardwareInfo.Data.Mem.Total,
Free: hardwareInfo.Data.Mem.Free,
},
DISK: &nodemanagerV2.DiskInfo{
Total: diskTotal,
Free: diskFree,
},
NET: &nodemanagerV2.NetInfo{
Ip: conf.GetConfig().ExternalIp,
Mac: macAddr,
Bandwidth: bandWidth,
},
}
return res
}
......@@ -75,6 +75,10 @@ func StartMonitor() {
for {
select {
case <-ticker.C:
if !IsRecvTask {
log.Warn("Stop receive task.........")
continue
}
log.Info("Monitoring node manager client thread start......")
for _, client := range usedNodeManagerClient {
log.WithField("Endpoint", client.Endpoint).WithField("LastHeartTime", client.LastHeartTime).WithField("Is Del", client.IsDel).WithField("Status", client.Status).Info("Monitoring node manager client thread")
......@@ -94,38 +98,54 @@ func StartMonitor() {
log.Warn("The managerClient is not exist:", managerClient.Endpoint)
continue
}
// TODO: 重试连接三次
//managerClient.UpdateLastHeartTime(time.Now())
managerClient.IsDel = true
isSuccess := inputNodeManagerChan(manager, nil, managerClient.IsSelected, monitorNm)
log.WithField("is success", isSuccess).Warn("Try to connect node manager client:", manager.Info.Endpoint)
usedNodeManagerClient = utils.DeleteNm(usedNodeManagerClient, i)
if isSuccess {
log.Info("Connect node manager client success:", manager.Info.Endpoint)
log.WithField("Endpoint", managerClient.Endpoint).Warn("Node manager client is deleted")
// 重试连接三次
tryConnectIsSuccess := false
for j := 1; j <= 3; j++ {
tryConnectIsSuccess = inputNodeManagerChan(manager, nil, managerClient.IsSelected, monitorNm)
log.WithField("is success", tryConnectIsSuccess).Warn("Try to connect node manager client:", manager.Info.Endpoint)
if tryConnectIsSuccess {
log.Info("Connect node manager client success:", manager.Info.Endpoint)
break
}
log.WithField("count", i).WithField("endpoint", manager.Info.Endpoint).Warn("Retrying node manager client connect")
}
if tryConnectIsSuccess {
continue
}
//managerClient.IsDel = true
log.WithField("Endpoint", managerClient.Endpoint).Warn("Node manager client is deleted")
// 从未使用的nm列表中选取新的nm进行连接
unUsedNodeManagers := getUnUsedNodeManagers()
if unUsedNodeManagers == nil || len(unUsedNodeManagers) == 0 {
log.Warn("There is no node manager available at this time")
break
}
// 从已存在的节点中选择一个提交DeviceInfo
isSelect := false
if managerClient.IsSelected {
for _, client := range usedNodeManagerClient {
if client.Status && !client.IsDel {
isSelect = true
client.IsSelected = true
break
}
}
}
for i := 0; i < len(unUsedNodeManagers); i++ {
if !isSelect {
isSelect = true
}
randomNum := utils.GenerateRandomNumber(conf.GetConfig().SignPrivateKey, int64(len(nodeManagerArr)))
unUsedManager := unUsedNodeManagers[randomNum.Int64()]
isSuccess := inputNodeManagerChan(unUsedManager, nil, false, monitorNm)
if !isSuccess {
log.Warn("Connect unused node manager client error:", manager.Info.Endpoint)
isSuccess := inputNodeManagerChan(unUsedManager, nil, isSelect, monitorNm)
if isSuccess {
log.Info("Connect unused node manager client successful:", manager.Info.Endpoint)
break
} else {
log.Warn("Connect unused node manager client error:", manager.Info.Endpoint)
}
}
}
......@@ -133,3 +153,29 @@ func StartMonitor() {
}
}
}
func inputNodeManagerChan(manager *NodeManager, nodeManagerClient *models.NodeManagerClient, isSelect bool, monitorNm *MonitorNm) bool {
if nodeManagerClient == nil {
nodeManagerClient = &models.NodeManagerClient{
PublicKey: manager.Info.Publickey,
Endpoint: manager.Info.Endpoint,
Status: true,
IsSelected: isSelect,
LastHeartTime: time.Now(),
}
usedNodeManagerClient = append(usedNodeManagerClient, nodeManagerClient)
}
serviceClient := operate.ConnNmGrpc(manager.Info.Endpoint)
if serviceClient == nil {
log.WithField("endPoint", manager.Info.Endpoint).Error("Connect node manager failed")
return false
}
nodeManagerClient.Status = true
nodeManagerClient.Client = serviceClient
monitorNm.NodeManagerClientChan <- nodeManagerClient
manager.IsUsed = true
if isSelect {
RunningState.NmIpAddr = nodeManagerClient.Endpoint
}
return true
}
......@@ -13,7 +13,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/golang/groupcache/lru"
baseV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
nodemanagerV2 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
"io"
"math/rand"
"mime/multipart"
......@@ -30,7 +30,7 @@ type TaskWorker struct {
LruCache *lru.Cache
DockerOp *operate.DockerOp
CmdOp *operate.Command
TaskMsg chan *nodeManagerV1.PushTaskMessage
TaskMsg chan *nodemanagerV2.PushTaskMessage
IsExecAiTask bool
IsExecStandardTask bool
ExecTaskIdIsFinished *sync.Map
......@@ -39,7 +39,7 @@ type TaskWorker struct {
}
type TaskOp struct {
taskMsg *nodeManagerV1.PushTaskMessage
taskMsg *nodemanagerV2.PushTaskMessage
taskCmd *models.TaskCmd
taskExecResult *models.TaskResult
taskParam *models.TaskParam
......@@ -54,7 +54,7 @@ func NewTaskWorker(op *operate.DockerOp) *TaskWorker {
Wg: &sync.WaitGroup{},
LruCache: lru.New(100),
DockerOp: op,
TaskMsg: make(chan *nodeManagerV1.PushTaskMessage, 0),
TaskMsg: make(chan *nodemanagerV2.PushTaskMessage, 0),
IsExecAiTask: false,
ExecTaskIdIsFinished: &sync.Map{},
}
......@@ -93,7 +93,7 @@ func (t *TaskWorker) DistributionTaskWorker(runCount int) {
}
}
func (t *TaskWorker) GetMinerSign(msg *nodeManagerV1.PushTaskMessage, taskResult []byte) ([]byte, []byte, []byte) {
func (t *TaskWorker) GetMinerSign(msg *nodemanagerV2.PushTaskMessage, taskResult []byte) ([]byte, []byte, []byte) {
reqHash := crypto.Keccak256Hash(msg.TaskParam)
respHash := crypto.Keccak256Hash(taskResult)
signHash := crypto.Keccak256Hash(bytes.NewBufferString(msg.TaskId).Bytes(), reqHash.Bytes(), respHash.Bytes())
......@@ -106,13 +106,13 @@ func (t *TaskWorker) GetMinerSign(msg *nodeManagerV1.PushTaskMessage, taskResult
return reqHash.Bytes(), respHash.Bytes(), sign
}
func (t *TaskWorker) SystemTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) {
func (t *TaskWorker) SystemTaskHandler(taskMsg *nodemanagerV2.PushTaskMessage) {
defer t.Wg.Done()
log.Info("received systemTask--------------------------------")
}
func (t *TaskWorker) CustomTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) {
func (t *TaskWorker) CustomTaskHandler(taskMsg *nodemanagerV2.PushTaskMessage) {
defer t.Wg.Done()
_, err := t.DockerOp.PsImages()
if err != nil {
......@@ -122,7 +122,7 @@ func (t *TaskWorker) CustomTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) {
log.Info("received customTask--------------------------------")
}
func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) {
func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodemanagerV2.PushTaskMessage) {
defer t.Wg.Done()
t.lastExecTaskStartTime = time.Now()
t.checkLastTaskExecStatus(taskMsg)
......@@ -202,7 +202,7 @@ func (t *TaskWorker) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
log.Info("----------------------Compute task exec done--------------------------------")
}
func (t *TaskWorker) GetAckResp(taskMsg *nodeManagerV1.PushTaskMessage) (isCanExecute bool, bootUpTime, queueWaitTime, executeTime int64) {
func (t *TaskWorker) GetAckResp(taskMsg *nodemanagerV2.PushTaskMessage) (isCanExecute bool, bootUpTime, queueWaitTime, executeTime int64) {
if t.IsExecStandardTask {
isCanExecute = true
return
......@@ -282,7 +282,7 @@ func (t *TaskWorker) foundImageIsRunning(imageName string) (bool, string, uint16
return false, "", 0
}
func (t *TaskWorker) checkLastTaskExecStatus(taskMsg *nodeManagerV1.PushTaskMessage) {
func (t *TaskWorker) checkLastTaskExecStatus(taskMsg *nodemanagerV2.PushTaskMessage) {
if taskMsg.TaskKind == baseV1.TaskKind_ComputeTask {
t.IsExecAiTask = true
if t.IsExecStandardTask {
......@@ -802,7 +802,17 @@ func parseData(readBody []byte) interface{} {
var audioOutput *models.BarkOutPut
if err := json.Unmarshal(m["output"], &audioOutput); err != nil {
log.WithField("err", err).Warn("parse audioOutput output filed failed:")
return nil
swinirOutPutArr := make([]*models.SwinirOutPut, 0)
if err := json.Unmarshal(m["output"], &audioOutput); err != nil {
log.WithField("err", err).Warn("parse swinirOutput output filed failed:")
return nil
} else {
res := make([]string, 0)
for _, out := range swinirOutPutArr {
res = append(res, out.File)
}
return res
}
} else {
return audioOutput.AudioOut
}
......
......@@ -3,16 +3,16 @@ package operate
import (
"example.com/m/log"
"github.com/docker/docker/client"
nodemanagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
nodemanagerV2 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
witnessv1 "github.com/odysseus/odysseus-protocol/gen/proto/go/witness/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func ConnNmGrpc(endpoint string) nodemanagerV1.NodeManagerServiceClient {
func ConnNmGrpc(endpoint string) nodemanagerV2.NodeManagerServiceClient {
conn := connGrpc(endpoint)
if conn != nil {
return nodemanagerV1.NewNodeManagerServiceClient(conn)
return nodemanagerV2.NewNodeManagerServiceClient(conn)
}
return nil
}
......
......@@ -13,10 +13,12 @@ import (
"github.com/docker/docker/client"
"github.com/docker/go-connections/nat"
"github.com/ethereum/go-ethereum/common"
nodemanagerv1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
nodemanagerV2 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
"io"
"net/http"
"os/exec"
"strconv"
"strings"
"time"
)
......@@ -68,7 +70,7 @@ func (d *DockerOp) GetImageInfo(imageName string) *models.ModelInfo {
return nil
}
func (d *DockerOp) GetContainerSign(taskMsg *nodemanagerv1.PushTaskMessage, taskRes []byte) []byte {
func (d *DockerOp) GetContainerSign(taskMsg *nodemanagerV2.PushTaskMessage, taskRes []byte) []byte {
reqBody := &models.TaskReq{
TaskId: taskMsg.TaskId,
TaskParam: taskMsg.TaskParam,
......@@ -309,10 +311,10 @@ func (d *DockerOp) PullImage(info *models.ModelInfo) {
}
}(response)
// 读取拉取镜像的输出
if _, err = io.ReadAll(response); err != nil {
log.WithError(err).Error("Read image pull response failed")
return
}
//if _, err = io.ReadAll(response); err != nil {
// log.WithError(err).Error("Read image pull response failed")
// return
//}
log.Info("Image pulled successfully.")
}
......@@ -339,3 +341,29 @@ func (d *DockerOp) inspectContainer(containerId string) *types.ContainerJSON {
log.Info("Image deleted successfully.")
return &containerJson
}
func (d *DockerOp) GetDockerInfo() (int64, int64, int64, int64, error) {
cmd := exec.Command("df", "/")
out, err := cmd.Output()
if err != nil {
return 0, 0, 0, 0, fmt.Errorf("exec cmd 'df /' error:%v", err)
}
// 将输出按换行符分割成多行
lines := strings.Split(string(out), "\n")
// 提取第二行,即包含文件系统信息的行
if len(lines) >= 2 {
fields := strings.Fields(lines[1]) // 将行按空格分割成多个字段
if len(fields) >= 6 {
log.WithField("Filesystem:", fields[0]).WithField("Size:", fields[1]).WithField("Used:", fields[2]).WithField("Avail:", fields[3]).WithField("Use%:", fields[4]).WithField("Mounted on:", fields[5]).Info()
totalSize, err := strconv.ParseInt(fields[1], 10, 64)
usedSize, err := strconv.ParseInt(fields[2], 10, 64)
availSize, err := strconv.ParseInt(fields[3], 10, 64)
usageSize, err := strconv.ParseInt(fields[4], 10, 64)
if err != nil {
return 0, 0, 0, 0, fmt.Errorf("failed to parse disk size error:%v", err)
}
return totalSize, usedSize, availSize, usageSize, nil
}
}
return 0, 0, 0, 0, fmt.Errorf("get disk size failed")
}
......@@ -6,7 +6,16 @@ import (
)
func init() {
beego.Router("/power/node/get/nm", &controllers.NodeController{}, "post:GetNodeManagers")
beego.Router("/power/node/set/nm/seed", &controllers.NodeController{}, "post:SetNmSeed")
beego.Router("/power/node/set/rewardAddr", &controllers.NodeController{}, "post:SetRewardAddress")
beego.Router("/api/v1/power/list/nm", &controllers.NodeController{}, "get:GetNodeManagers")
beego.Router("/api/v1/power/set/nm/seed", &controllers.NodeController{}, "post:SetNmSeed")
beego.Router("/api/v1/power/join/benefit/acc", &controllers.NodeController{}, "post:SetBenefitAddress")
beego.Router("/api/v1/power/list/benefit/acc", &controllers.NodeController{}, "get:ListHistoryBenefitAddress")
beego.Router("/api/v1/power/update/recv/status", &controllers.NodeController{}, "post:UpdateRecvStatus")
beego.Router("/api/v1/power/get/recv/status", &controllers.NodeController{}, "get:GetRecvStatus")
beego.Router("/api/v1/power/get/conf", &controllers.NodeController{}, "get:GetConfigInfo")
beego.Router("/api/v1/power/get/running/state", &controllers.StateController{}, "get:GetRunningState")
beego.Router("/api/v1/power/get/worker/info", &controllers.StateController{}, "get:GetWorkerInfo")
beego.Router("/api/v1/power/list/gpu/info", &controllers.StateController{}, "get:GetListGpuInfo")
beego.Router("/api/v1/power/get/gpu/info", &controllers.StateController{}, "post:GetGpuUsageInfo")
beego.Router("/api/v1/power/get/hardware/info", &controllers.StateController{}, "post:GetOtherHardwareInfo")
}
......@@ -2,6 +2,9 @@ package test
import (
"fmt"
"net"
"os/exec"
"strings"
"sync"
"testing"
"time"
......@@ -62,6 +65,49 @@ func Test_initConfig(t *testing.T) {
}
}
}
func Test_getDiskInfo(t *testing.T) {
// 运行 df -h /var/lib/docker 命令并获取输出
cmd := exec.Command("df", "/")
out, err := cmd.Output()
if err != nil {
fmt.Println("Error:", err)
return
}
// 将输出按换行符分割成多行
lines := strings.Split(string(out), "\n")
// 提取第二行,即包含文件系统信息的行
if len(lines) >= 2 {
fields := strings.Fields(lines[1]) // 将行按空格分割成多个字段
if len(fields) >= 6 {
// 打印各个字段的值
fmt.Println("Filesystem:", fields[0])
fmt.Println("Size:", fields[1])
fmt.Println("Used:", fields[2])
fmt.Println("Avail:", fields[3])
fmt.Println("Use%:", fields[4])
fmt.Println("Mounted on:", fields[5])
}
}
}
func TestNetworkDelay(t *testing.T) {
start := time.Now()
ip := "43.198.29.144"
conn, err := net.Dial("ip4:icmp", ip)
if err != nil {
fmt.Println("Error:", err)
return
}
defer conn.Close()
duration := time.Since(start)
fmt.Printf("Ping %s: %v\n", ip, duration)
}
func Sell(wg *sync.WaitGroup) {
defer wg.Done()
time.Sleep(6 * time.Second)
......
......@@ -2,6 +2,8 @@ package utils
import (
"bytes"
"crypto/aes"
"crypto/cipher"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
......@@ -9,9 +11,11 @@ import (
"encoding/json"
"example.com/m/log"
"example.com/m/models"
//"example.com/m/nm"
"fmt"
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/ethereum/go-ethereum/crypto"
"io"
"math/big"
"net/http"
"net/url"
......@@ -217,3 +221,118 @@ func DeleteNm(nodeArr []*models.NodeManagerClient, target int) []*models.NodeMan
nodeArr[target] = nodeArr[len(nodeArr)-1]
return nodeArr[:len(nodeArr)-1]
}
func WriteBenefitFile(historyBenefitAcc []string) error {
log.Info("WritingBenefitFile........")
key := []byte(models.EncryptionKey)
serializedData, err := json.Marshal(historyBenefitAcc)
if err != nil {
log.Error("")
return fmt.Errorf("json marshal HistoryBenefitAcc")
}
err = encryptAndWriteToFile(key, serializedData, models.HistoryBenefitAddressDirectory)
if err != nil {
return err
}
return nil
}
func ReadBenefitFile() ([]string, error) {
log.Info("ReadBenefitFile........")
key := []byte(models.EncryptionKey)
readRes, err := readAndDecryptFile(key, models.HistoryBenefitAddressDirectory)
if err != nil {
return nil, err
}
res := make([]string, 0)
err = json.Unmarshal(readRes, &res)
if err != nil {
return nil, err
}
return res, nil
}
func encryptAndWriteToFile(key []byte, serializedData []byte, filename string) error {
// 创建 AES 加密器
block, err := aes.NewCipher(key)
if err != nil {
return fmt.Errorf("new cipher error:%v", err)
}
// 使用 AES-GCM 模式加密
gcm, err := cipher.NewGCM(block)
if err != nil {
return fmt.Errorf("new GCM error:%v", err)
}
// 生成随机 nonce
nonce := make([]byte, gcm.NonceSize())
if _, err = io.ReadFull(rand.Reader, nonce); err != nil {
return fmt.Errorf("failed to read nonce:%v", err)
}
// 加密数据
encryptedData := gcm.Seal(nonce, nonce, serializedData, nil)
// 写入加密后的数据到文件
if err := os.WriteFile(filename, encryptedData, 0644); err != nil {
return fmt.Errorf("write file failed %v", err)
}
return nil
}
func readAndDecryptFile(key []byte, filename string) ([]byte, error) {
// 读取加密文件内容
encryptedData, err := os.ReadFile(filename)
if err != nil {
return nil, err
}
// 创建 AES 解密器
block, err := aes.NewCipher(key)
if err != nil {
return nil, err
}
// 使用 AES-GCM 模式解密
gcm, err := cipher.NewGCM(block)
if err != nil {
return nil, err
}
// 从加密文件中提取 nonce
nonceSize := gcm.NonceSize()
if len(encryptedData) < nonceSize {
return nil, err
}
nonce, encryptedData := encryptedData[:nonceSize], encryptedData[nonceSize:]
// 解密数据
decryptedData, err := gcm.Open(nil, nonce, encryptedData, nil)
if err != nil {
return nil, err
}
return decryptedData, nil
}
func GetHardwareInfo() *models.HardwareInfoRep {
resp, err := http.Get("http://124.193.167.71:5000/hw")
if err != nil {
log.Error("Error creating request")
return nil
}
res := &models.HardwareInfoRep{}
body, err := io.ReadAll(resp.Body)
if err != nil {
log.Error("io.ReadAll failed")
return nil
}
err = json.Unmarshal(body, res)
if err != nil {
log.Error("json.Unmarshal failed")
return nil
}
return res
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment