Commit 1bfec526 authored by duanjinfei's avatar duanjinfei

seconds commit

parent 6eeefaf6
...@@ -2,3 +2,4 @@ ...@@ -2,3 +2,4 @@
logs logs
*.DS_Store *.DS_Store
mydb mydb
keystore
\ No newline at end of file
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"example.com/m/conf" "example.com/m/conf"
"example.com/m/log" "example.com/m/log"
"example.com/m/nm" "example.com/m/nm"
"example.com/m/utils"
"fmt" "fmt"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
...@@ -39,6 +40,7 @@ func init() { ...@@ -39,6 +40,7 @@ func init() {
// initConfig reads in conf file and ENV variables if set. // initConfig reads in conf file and ENV variables if set.
func initConfig() { func initConfig() {
// 设置配置文件的名称(不包含扩展名) // 设置配置文件的名称(不包含扩展名)
viper.SetConfigName("config") viper.SetConfigName("config")
...@@ -46,7 +48,7 @@ func initConfig() { ...@@ -46,7 +48,7 @@ func initConfig() {
viper.SetConfigType("json") viper.SetConfigType("json")
// 设置配置文件所在的目录 // 设置配置文件所在的目录
viper.AddConfigPath(".") viper.AddConfigPath("./")
// 读取配置文件 // 读取配置文件
if err := viper.ReadInConfig(); err != nil { if err := viper.ReadInConfig(); err != nil {
...@@ -74,15 +76,16 @@ func initConfig() { ...@@ -74,15 +76,16 @@ func initConfig() {
panic("Json unmarshal cfg error") panic("Json unmarshal cfg error")
} }
_cfg.HeartRespTimeMillis = _cfg.HeartRespTimeSecond * 60 * 60 * 1000 _cfg.HeartRespTimeMillis = _cfg.HeartRespTimeSecond * 60 * 60 * 1000
prvKey, err := crypto.HexToECDSA(_cfg.SignPrv) prvKey, err := utils.GetPrv()
if err != nil { if err != nil {
return panic("get prv error or delete keystore after restart")
} }
_cfg.SignPrivateKey = prvKey _cfg.SignPrivateKey = prvKey
ecdsaPub := prvKey.PublicKey ecdsaPub := prvKey.PublicKey
publicAddr := crypto.PubkeyToAddress(ecdsaPub)
_cfg.SignPub = common.Bytes2Hex(crypto.FromECDSAPub(&ecdsaPub)) _cfg.SignPub = common.Bytes2Hex(crypto.FromECDSAPub(&ecdsaPub))
log.Info(_cfg.SignPub) log.Info("PublicKey", _cfg.SignPub)
publicAddr := crypto.PubkeyToAddress(ecdsaPub)
log.Info("publicAddr:", publicAddr)
_cfg.SignPublicAddress = publicAddr _cfg.SignPublicAddress = publicAddr
} }
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"crypto/ecdsa" "crypto/ecdsa"
"encoding/json" "encoding/json"
"example.com/m/log" "example.com/m/log"
"example.com/m/utils"
"fmt" "fmt"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
...@@ -13,7 +14,7 @@ import ( ...@@ -13,7 +14,7 @@ import (
type Config struct { type Config struct {
NmSeed string `json:"nm_seed"` NmSeed string `json:"nm_seed"`
SignPrv string `json:"sign_prv"` SignPrv string
SignPrivateKey *ecdsa.PrivateKey SignPrivateKey *ecdsa.PrivateKey
SignPub string SignPub string
SignPublicAddress common.Address SignPublicAddress common.Address
...@@ -24,6 +25,7 @@ type Config struct { ...@@ -24,6 +25,7 @@ type Config struct {
BenefitAddress string `json:"benefit_address"` BenefitAddress string `json:"benefit_address"`
ContainerNum int64 `json:"container_num"` ContainerNum int64 `json:"container_num"`
NodeManagerNum int64 `json:"node_manager_num"` NodeManagerNum int64 `json:"node_manager_num"`
ChainID int64 `json:"chain_id"`
} }
var _cfg *Config = nil var _cfg *Config = nil
...@@ -64,18 +66,31 @@ func init() { ...@@ -64,18 +66,31 @@ func init() {
panic("Json unmarshal cfg error") panic("Json unmarshal cfg error")
} }
_cfg.HeartRespTimeMillis = _cfg.HeartRespTimeSecond * 60 * 60 * 1000 _cfg.HeartRespTimeMillis = _cfg.HeartRespTimeSecond * 60 * 60 * 1000
prvKey, err := crypto.HexToECDSA(_cfg.SignPrv) prvKey, err := utils.GetPrv()
if err != nil { if err != nil {
return panic("get prv error or delete keystore after restart")
} }
_cfg.SignPrivateKey = prvKey _cfg.SignPrivateKey = prvKey
ecdsaPub := prvKey.PublicKey ecdsaPub := prvKey.PublicKey
publicAddr := crypto.PubkeyToAddress(ecdsaPub)
_cfg.SignPub = common.Bytes2Hex(crypto.FromECDSAPub(&ecdsaPub)) _cfg.SignPub = common.Bytes2Hex(crypto.FromECDSAPub(&ecdsaPub))
log.Info(_cfg.SignPub) log.Info("PublicKey", _cfg.SignPub)
publicAddr := crypto.PubkeyToAddress(ecdsaPub)
log.Info("publicAddr:", publicAddr)
_cfg.SignPublicAddress = publicAddr _cfg.SignPublicAddress = publicAddr
} }
func GetConfig() *Config { func GetConfig() *Config {
return _cfg return _cfg
} }
func (c *Config) SetRewardAddress(addr string) bool {
isAddr := common.IsHexAddress(addr)
if isAddr {
c.BenefitAddress = addr
}
return isAddr
}
func (c *Config) SetNmSeed(seed string) {
c.NmSeed = seed
}
...@@ -6,5 +6,6 @@ ...@@ -6,5 +6,6 @@
"node_manager_num": 1, "node_manager_num": 1,
"heart_response": 30, "heart_response": 30,
"task_validator_time": 1, "task_validator_time": 1,
"container_num": 1 "container_num": 1,
"chain_id": 100
} }
\ No newline at end of file
package controllers package controllers
import ( import (
"encoding/json"
"example.com/m/conf"
"example.com/m/models"
"example.com/m/nm" "example.com/m/nm"
"example.com/m/operate"
nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1" nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"io"
) )
type NodeController struct { type NodeController struct {
...@@ -10,18 +15,74 @@ type NodeController struct { ...@@ -10,18 +15,74 @@ type NodeController struct {
} }
func (c *NodeController) SetNmSeed() { func (c *NodeController) SetNmSeed() {
bodyReq, err := io.ReadAll(c.Ctx.Request.Body)
if err != nil || bodyReq == nil {
c.ResponseInfo(500, "param error", "")
return
}
req := &models.SeedUrl{}
err = json.Unmarshal(bodyReq, req)
if err != nil {
c.ResponseInfo(500, "param error", "")
return
}
serviceClient := operate.ConnNmGrpc(req.Seed)
if serviceClient == nil {
c.ResponseInfo(500, "seed is not connected", "")
}
conf.GetConfig().SetNmSeed(req.Seed)
c.ResponseInfo(200, "set seed successful", "")
}
func (c *NodeController) SetRewardAddress() {
bodyReq, err := io.ReadAll(c.Ctx.Request.Body)
if err != nil || bodyReq == nil {
c.ResponseInfo(500, "param error", "")
return
}
req := &models.RewardAddress{}
err = json.Unmarshal(bodyReq, req)
if err != nil {
c.ResponseInfo(500, "param error", "")
return
}
if !conf.GetConfig().SetRewardAddress(req.Address) {
c.ResponseInfo(500, "param is not address", "")
}
c.ResponseInfo(200, "sign successful", "") c.ResponseInfo(200, "sign successful", "")
} }
func (c *NodeController) AddNodeManager() { func (c *NodeController) AddNodeManager() {
bodyReq, err := io.ReadAll(c.Ctx.Request.Body)
if err != nil || bodyReq == nil {
c.ResponseInfo(500, "param error", "")
return
}
req := &models.NodeManagerReq{}
err = json.Unmarshal(bodyReq, req)
if err != nil || req.PublicKey == "" || req.EndPoint == "" {
c.ResponseInfo(500, "param error", "")
return
}
nodeManager := &nodeManagerV1.NodeManagerInfo{ nodeManager := &nodeManagerV1.NodeManagerInfo{
Publickey: "public", Publickey: req.PublicKey,
Endpoint: "http://localhost", Endpoint: req.EndPoint,
} }
nm.AddNodeManager(nodeManager) nm.AddNodeManager(nodeManager)
c.ResponseInfo(200, "sign successful", "")
} }
func (c *NodeController) GetNodeManager() { func (c *NodeController) GetNodeManager() {
manager := nm.GetNodeManagers() manager := nm.GetNodeManagers()
c.ResponseInfo(200, "sign successful", manager) res := make([]*nm.NodeManager, 0)
for _, nodeManager := range manager {
if !nodeManager.IsExist {
continue
}
if nodeManager.IsUsed {
res = append(res, nodeManager)
}
}
c.ResponseInfo(200, "Get used node manager successful", res)
} }
...@@ -4,9 +4,9 @@ go 1.19 ...@@ -4,9 +4,9 @@ go 1.19
require ( require (
github.com/astaxie/beego v1.12.3 github.com/astaxie/beego v1.12.3
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1
github.com/docker/docker v24.0.7+incompatible github.com/docker/docker v24.0.7+incompatible
github.com/ethereum/go-ethereum v1.13.10 github.com/docker/go-connections v0.5.0
github.com/ethereum/go-ethereum v1.13.11
github.com/go-cmd/cmd v1.4.2 github.com/go-cmd/cmd v1.4.2
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
github.com/kardianos/service v1.2.2 github.com/kardianos/service v1.2.2
...@@ -23,16 +23,23 @@ require ( ...@@ -23,16 +23,23 @@ require (
require ( require (
github.com/Microsoft/go-winio v0.6.1 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.10.0 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/consensys/bavard v0.1.13 // indirect
github.com/consensys/gnark-crypto v0.12.1 // indirect
github.com/crate-crypto/go-kzg-4844 v0.7.0 // indirect
github.com/deckarep/golang-set/v2 v2.1.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/distribution/reference v0.5.0 // indirect github.com/distribution/reference v0.5.0 // indirect
github.com/docker/distribution v2.8.3+incompatible // indirect github.com/docker/distribution v2.8.3+incompatible // indirect
github.com/docker/go-connections v0.5.0 // indirect
github.com/docker/go-units v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect
github.com/ethereum/c-kzg-4844 v0.4.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect
github.com/holiman/uint256 v1.2.4 // indirect github.com/holiman/uint256 v1.2.4 // indirect
...@@ -42,6 +49,7 @@ require ( ...@@ -42,6 +49,7 @@ require (
github.com/magiconair/properties v1.8.7 // indirect github.com/magiconair/properties v1.8.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mmcloughlin/addchain v0.4.0 // indirect
github.com/moby/term v0.5.0 // indirect github.com/moby/term v0.5.0 // indirect
github.com/morikuni/aec v1.0.0 // indirect github.com/morikuni/aec v1.0.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect
...@@ -60,22 +68,24 @@ require ( ...@@ -60,22 +68,24 @@ require (
github.com/spf13/cast v1.6.0 // indirect github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.6.0 // indirect github.com/subosito/gotenv v1.6.0 // indirect
github.com/supranational/blst v0.3.11 // indirect
go.uber.org/atomic v1.9.0 // indirect go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect go.uber.org/multierr v1.9.0 // indirect
golang.org/x/crypto v0.17.0 // indirect golang.org/x/crypto v0.17.0 // indirect
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect
golang.org/x/mod v0.14.0 // indirect golang.org/x/mod v0.14.0 // indirect
golang.org/x/net v0.19.0 // indirect golang.org/x/net v0.19.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sys v0.16.0 // indirect golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.15.0 // indirect golang.org/x/tools v0.15.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240108191215-35c7eff3a6b1 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240108191215-35c7eff3a6b1 // indirect
google.golang.org/protobuf v1.32.0 // indirect google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
gotest.tools/v3 v3.5.1 // indirect gotest.tools/v3 v3.5.1 // indirect
rsc.io/tmplfunc v0.0.3 // indirect
) )
replace github.com/odysseus/odysseus-protocol => ../odysseus-protocol replace github.com/odysseus/odysseus-protocol => ../odysseus-protocol
This diff is collapsed.
package models
type RewardAddress struct {
Address string `json:"address"`
}
type SeedUrl struct {
Seed string `json:"seed"`
}
type NodeManagerReq struct {
PublicKey string `json:"public_key"`
EndPoint string `json:"end_point"`
}
package nm
import (
"example.com/m/log"
"example.com/m/models"
"example.com/m/operate"
nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"time"
)
func monitorModelInfo(dockerOp *operate.DockerOp) {
// TODO: 向api端获取model信息
info := &models.ModelInfo{
TaskId: 1,
User: "",
Pwd: "",
Repository: "",
SignUrl: "http://192.168.1.120:8888/llm/test/get/sign",
ImageName: "onlydd/llm-server:0119",
DiskSize: 10000,
MemorySize: 10000,
IsImageExist: false,
}
ticker := time.NewTicker(time.Second * 1)
for {
select {
case <-ticker.C:
resp := make([]*models.ModelInfo, 0)
resp = append(resp, info)
modelInfoMap := make(map[uint64]*models.ModelInfo, 0)
for _, modelInfo := range resp {
modelInfoMap[modelInfo.TaskId] = modelInfo
}
imageNameMap, err := dockerOp.PsImageNameMap()
if err != nil {
log.Error("Docker op ps images failed:", err)
continue
}
for _, modelInfo := range modelInfos {
if modelInfo.IsImageExist {
continue
}
if !imageNameMap[modelInfo.ImageName] {
// todo: 判断机器资源是否够用
isPull := isResourceEnough(modelInfo)
// todo: 如果够用
if isPull {
go dockerOp.PullImage(modelInfo)
modelInfo.IsImageExist = true
dockerOp.ModelTaskIdChan <- modelInfo.TaskId
}
} else if !dockerOp.IsReportModelTaskId[modelInfo.TaskId] {
dockerOp.ModelTaskIdChan <- modelInfo.TaskId
dockerOp.IsReportModelTaskId[modelInfo.TaskId] = true
}
if modelInfoMap[modelInfo.TaskId] == nil {
dockerOp.SignApi[modelInfo.ImageName] = modelInfo.SignUrl
dockerOp.ModelsInfo = append(dockerOp.ModelsInfo, modelInfo)
}
}
}
}
}
func reportModelInfo(nodeManager *models.NodeManagerClient,
worker nodeManagerV1.NodeManagerService_RegisterWorkerClient,
msgRespWorker *RespMsgWorker, dockerOp *operate.DockerOp) {
for {
select {
case taskId := <-dockerOp.ModelTaskIdChan:
params := buildParams(taskId)
msgRespWorker.RegisterMsgResp(nodeManager, worker, SubmitResourceMapRes, params)
}
}
}
func isResourceEnough(modelInfo *models.ModelInfo) bool {
return true
}
package nm
import (
"context"
"example.com/m/conf"
"example.com/m/log"
"example.com/m/operate"
nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"google.golang.org/grpc"
"time"
)
func monitorNodeManagerSeed() {
ticker := time.NewTicker(time.Second * 1)
for {
select {
case <-ticker.C:
seed := conf.GetConfig().NmSeed
log.Info("Nm seed url:", seed)
seedServiceClient := operate.ConnNmGrpc(seed)
if seedServiceClient == nil {
panic("Dial nm seed service client failed")
}
list, err := seedServiceClient.ManagerList(context.Background(), &nodeManagerV1.ManagerListRequest{}, grpc.EmptyCallOption{})
if err != nil {
panic("Nm seed seed service is dealing")
}
if list.GetManagers() == nil || len(list.GetManagers()) == 0 {
continue
}
for _, node := range list.GetManagers() {
if isExistNodeManager(node.Endpoint) {
continue
}
nodeManagerArr = append(nodeManagerArr, &NodeManager{Info: node, IsUsed: false, IsExist: true})
}
isInit = false
ticker = time.NewTicker(time.Minute * 10)
}
}
}
...@@ -192,16 +192,18 @@ func SubmitResultResp(params ...interface{}) *nodemanagerV1.WorkerMessage { ...@@ -192,16 +192,18 @@ func SubmitResultResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
minerSign := params[2].([]byte) minerSign := params[2].([]byte)
taskResultHeader := params[3].([]byte) taskResultHeader := params[3].([]byte)
taskResultBody := params[4].([]byte) taskResultBody := params[4].([]byte)
isSuccess := params[5].(bool) taskResExecTime := params[5].(int64)
isSuccess := params[6].(bool)
submitResultMsgRes := &nodemanagerV1.WorkerMessage{ submitResultMsgRes := &nodemanagerV1.WorkerMessage{
Message: &nodemanagerV1.WorkerMessage_SubmitTaskResult{ Message: &nodemanagerV1.WorkerMessage_SubmitTaskResult{
SubmitTaskResult: &nodemanagerV1.SubmitTaskResult{ SubmitTaskResult: &nodemanagerV1.SubmitTaskResult{
TaskUuid: taskId, TaskUuid: taskId,
ContainerSignature: containerSign, ContainerSignature: containerSign,
MinerSignature: minerSign, MinerSignature: minerSign,
TaskResultHeader: taskResultHeader, TaskResultHeader: taskResultHeader,
TaskResultBody: taskResultBody, TaskResultBody: taskResultBody,
IsSuccessed: isSuccess, TaskExecuteDuration: uint64(taskResExecTime),
IsSuccessed: isSuccess,
}, },
}, },
} }
......
package nm
import (
"example.com/m/models"
"example.com/m/operate"
nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"time"
)
type NodeManager struct {
Info *nodeManagerV1.NodeManagerInfo `json:"info,omitempty"`
IsUsed bool `json:"isUsed,omitempty"`
IsExist bool `json:"isExist,omitempty"`
}
func GetNodeManagers() []*NodeManager {
return nodeManagerArr
}
func AddNodeManager(node *nodeManagerV1.NodeManagerInfo) {
nodeManager := &NodeManager{
Info: node,
IsUsed: false,
IsExist: true,
}
nodeManagerArr = append(nodeManagerArr, nodeManager)
}
func DelNodeManager(node *nodeManagerV1.NodeManagerInfo) {
for _, manager := range nodeManagerArr {
if manager.Info.Endpoint == node.Endpoint {
manager.IsExist = false
}
}
}
func getUnUsedNodeManagers() []*NodeManager {
res := make([]*NodeManager, 0)
for _, manager := range nodeManagerArr {
if !manager.IsUsed && manager.IsExist {
res = append(res, manager)
}
}
return res
}
func isExistNodeManager(endPoint string) bool {
for _, manager := range nodeManagerArr {
if endPoint == manager.Info.Endpoint {
return true
}
}
return false
}
func getNodeManager(endPoint string) *NodeManager {
for _, manager := range nodeManagerArr {
if manager.Info.Endpoint == endPoint {
return manager
}
}
return nil
}
func inputNodeManagerChan(manager *NodeManager, nodeManagerClient *models.NodeManagerClient) bool {
if nodeManagerClient == nil {
nodeManagerClient = &models.NodeManagerClient{
PublicKey: manager.Info.Publickey,
Endpoint: manager.Info.Endpoint,
Status: true,
LastHeartTime: time.Now().UnixMilli(),
}
}
serviceClient := operate.ConnNmGrpc(manager.Info.Endpoint)
if serviceClient == nil {
return false
}
nodeManagerClient.Status = true
nodeManagerClient.Client = serviceClient
nodeManagerClientChan <- nodeManagerClient
usedNodeManagerClient = append(usedNodeManagerClient, nodeManagerClient)
manager.IsUsed = true
return true
}
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"example.com/m/log" "example.com/m/log"
"example.com/m/models" "example.com/m/models"
"example.com/m/operate" "example.com/m/operate"
"example.com/m/utils"
"example.com/m/validator" "example.com/m/validator"
"fmt" "fmt"
nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1" nodeManagerV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
...@@ -14,12 +15,6 @@ import ( ...@@ -14,12 +15,6 @@ import (
"time" "time"
) )
type NodeManager struct {
Info *nodeManagerV1.NodeManagerInfo
IsUsed bool
IsExist bool
}
// 指定远程 Docker 服务的地址 // 指定远程 Docker 服务的地址
var ( var (
isInit = true isInit = true
...@@ -27,11 +22,13 @@ var ( ...@@ -27,11 +22,13 @@ var (
usedNodeManagerClient []*models.NodeManagerClient usedNodeManagerClient []*models.NodeManagerClient
nodeManagerClientChan chan *models.NodeManagerClient nodeManagerClientChan chan *models.NodeManagerClient
nodeManagerMsgChan chan *nodeManagerV1.ManagerMessage nodeManagerMsgChan chan *nodeManagerV1.ManagerMessage
modelInfos []*models.ModelInfo
) )
func init() { func init() {
nodeManagerArr = make([]*NodeManager, 0) nodeManagerArr = make([]*NodeManager, 0)
usedNodeManagerClient = make([]*models.NodeManagerClient, 0) usedNodeManagerClient = make([]*models.NodeManagerClient, 0)
modelInfos = make([]*models.ModelInfo, 0)
nodeManagerClientChan = make(chan *models.NodeManagerClient, 0) nodeManagerClientChan = make(chan *models.NodeManagerClient, 0)
nodeManagerMsgChan = make(chan *nodeManagerV1.ManagerMessage, 1000) nodeManagerMsgChan = make(chan *nodeManagerV1.ManagerMessage, 1000)
} }
...@@ -53,19 +50,23 @@ func StartMonitor() { ...@@ -53,19 +50,23 @@ func StartMonitor() {
} }
connectNodeManagerCount := 0 connectNodeManagerCount := 0
for _, manager := range nodeManagerArr { for i := 0; i < len(nodeManagerArr); i++ {
if !manager.IsExist {
continue
}
// TODO: 需要对索引进行一定的规则判断,随机选择其中的nodeManager进行链接 // TODO: 需要对索引进行一定的规则判断,随机选择其中的nodeManager进行链接
if int64(connectNodeManagerCount) == conf.GetConfig().NodeManagerNum { if int64(connectNodeManagerCount) == conf.GetConfig().NodeManagerNum {
break break
} }
isSuccess := inputNodeManagerChan(manager, nil) randomNum := utils.GenerateRandomNumber(conf.GetConfig().SignPrivateKey, int64(len(nodeManagerArr)))
if !isSuccess { manager := nodeManagerArr[randomNum.Int64()]
panic("Init input node manager chan failed") if !manager.IsExist {
continue
}
if !manager.IsUsed {
isSuccess := inputNodeManagerChan(manager, nil)
if !isSuccess {
panic("Init input node manager chan failed")
}
connectNodeManagerCount++
} }
connectNodeManagerCount++
} }
log.Info("Monitoring node manager client thread start......") log.Info("Monitoring node manager client thread start......")
...@@ -84,19 +85,22 @@ func StartMonitor() { ...@@ -84,19 +85,22 @@ func StartMonitor() {
if !managerClient.IsDel { if !managerClient.IsDel {
// TODO: 重试连接三次 // TODO: 重试连接三次
isSuccess = inputNodeManagerChan(manager, managerClient) isSuccess = inputNodeManagerChan(manager, managerClient)
if isSuccess {
continue
}
}
managerClient.IsDel = true
unUsedNodeManagers := getUnUsedNodeManagers()
if unUsedNodeManagers == nil || len(unUsedNodeManagers) == 0 {
break
} }
if !isSuccess { randomNum := utils.GenerateRandomNumber(conf.GetConfig().SignPrivateKey, int64(len(nodeManagerArr)))
managerClient.IsDel = true unUsedManager := unUsedNodeManagers[randomNum.Int64()]
unUsedNodeManager := getUnUsedNodeManager() for i := 0; i < len(unUsedNodeManagers); i++ {
if unUsedNodeManager == nil || len(unUsedNodeManager) == 0 { isSuccess := inputNodeManagerChan(unUsedManager, nil)
if !isSuccess {
break break
} }
for _, nodeManager := range unUsedNodeManager {
isSuccess := inputNodeManagerChan(nodeManager, nil)
if !isSuccess {
break
}
}
} }
} }
} }
...@@ -104,76 +108,6 @@ func StartMonitor() { ...@@ -104,76 +108,6 @@ func StartMonitor() {
} }
} }
func getUnUsedNodeManager() []*NodeManager {
res := make([]*NodeManager, 0)
for _, manager := range nodeManagerArr {
if !manager.IsUsed && manager.IsExist {
res = append(res, manager)
}
}
return res
}
func isExistNodeManager(endPoint string) bool {
for _, manager := range nodeManagerArr {
if endPoint == manager.Info.Endpoint {
return true
}
}
return false
}
func monitorNodeManagerSeed() {
ticker := time.NewTicker(time.Second * 1)
for {
select {
case <-ticker.C:
seed := conf.GetConfig().NmSeed
log.Info("Nm seed url:", seed)
seedServiceClient := operate.ConnNmGrpc(seed)
if seedServiceClient == nil {
panic("Dial nm seed service client failed")
}
list, err := seedServiceClient.ManagerList(context.Background(), &nodeManagerV1.ManagerListRequest{}, grpc.EmptyCallOption{})
if err != nil {
panic("Nm seed seed service is dealing")
}
if list.GetManagers() == nil || len(list.GetManagers()) == 0 {
continue
}
for _, node := range list.GetManagers() {
if isExistNodeManager(node.Endpoint) {
continue
}
nodeManagerArr = append(nodeManagerArr, &NodeManager{Info: node, IsUsed: false, IsExist: true})
}
isInit = false
ticker = time.NewTicker(time.Minute * 10)
}
}
}
func GetNodeManagers() []*NodeManager {
return nodeManagerArr
}
func AddNodeManager(node *nodeManagerV1.NodeManagerInfo) {
nodeManager := &NodeManager{
Info: node,
IsUsed: false,
IsExist: true,
}
nodeManagerArr = append(nodeManagerArr, nodeManager)
}
func DelNodeManager(node *nodeManagerV1.NodeManagerInfo) {
for _, manager := range nodeManagerArr {
if manager.Info.Endpoint == node.Endpoint {
manager.IsExist = false
}
}
}
// monitorWorker 监听worker // monitorWorker 监听worker
func monitorWorker(op *operate.DockerOp) { func monitorWorker(op *operate.DockerOp) {
log.Info("Monitoring worker thread start......") log.Info("Monitoring worker thread start......")
...@@ -243,57 +177,6 @@ func monitorWorker(op *operate.DockerOp) { ...@@ -243,57 +177,6 @@ func monitorWorker(op *operate.DockerOp) {
} }
} }
func monitorModelInfo(dockerOp *operate.DockerOp) {
// TODO: 向api端获取model信息
modelInfo := &models.ModelInfo{
TaskId: 1,
User: "",
Pwd: "",
Repository: "",
SignUrl: "http://192.168.1.120:8888/llm/test/get/sign",
ImageName: "onlydd/llm-server:0119",
DiskSize: 10000,
MemorySize: 10000,
IsImageExist: false,
}
imageNameMap, err := dockerOp.PsImageNameMap()
if err != nil {
log.Error("Docker op ps images failed:", err)
return
}
if !imageNameMap[modelInfo.ImageName] {
// todo: 判断机器资源是否够用
isPull := IsResourceEnough(modelInfo)
// todo: 如果够用
if isPull {
go dockerOp.PullImage(modelInfo)
modelInfo.IsImageExist = true
dockerOp.ModelTaskIdChan <- modelInfo.TaskId
}
} else if !dockerOp.IsReportModelTaskId[modelInfo.TaskId] {
dockerOp.ModelTaskIdChan <- modelInfo.TaskId
dockerOp.IsReportModelTaskId[modelInfo.TaskId] = true
}
dockerOp.SignApi[modelInfo.ImageName] = modelInfo.SignUrl
dockerOp.ModelsInfo = append(dockerOp.ModelsInfo, modelInfo)
}
func reportModelInfo(nodeManager *models.NodeManagerClient,
worker nodeManagerV1.NodeManagerService_RegisterWorkerClient,
msgRespWorker *RespMsgWorker, dockerOp *operate.DockerOp) {
for {
select {
case taskId := <-dockerOp.ModelTaskIdChan:
params := buildParams(taskId)
msgRespWorker.RegisterMsgResp(nodeManager, worker, SubmitResourceMapRes, params)
}
}
}
func IsResourceEnough(modelInfo *models.ModelInfo) bool {
return true
}
// handlerMsg 通过 goroutine 处理Msg // handlerMsg 通过 goroutine 处理Msg
func handlerMsg(nodeManager *models.NodeManagerClient, func handlerMsg(nodeManager *models.NodeManagerClient,
worker nodeManagerV1.NodeManagerService_RegisterWorkerClient, worker nodeManagerV1.NodeManagerService_RegisterWorkerClient,
...@@ -327,6 +210,7 @@ func handlerMsg(nodeManager *models.NodeManagerClient, ...@@ -327,6 +210,7 @@ func handlerMsg(nodeManager *models.NodeManagerClient,
taskMsgWorker.Wg.Wait() taskMsgWorker.Wg.Wait()
taskResHeader := taskMsgWorker.TaskRespHeader[taskMsg.TaskUuid] taskResHeader := taskMsgWorker.TaskRespHeader[taskMsg.TaskUuid]
taskResBody := taskMsgWorker.TaskRespBody[taskMsg.TaskUuid] taskResBody := taskMsgWorker.TaskRespBody[taskMsg.TaskUuid]
taskResExecTime := taskMsgWorker.TaskExecTime[taskMsg.TaskUuid]
isSuccess := taskMsgWorker.TaskIsSuccess[taskMsg.TaskUuid] isSuccess := taskMsgWorker.TaskIsSuccess[taskMsg.TaskUuid]
containerSign := taskMsgWorker.DockerOp.GetContainerSign(taskMsg, taskResBody) containerSign := taskMsgWorker.DockerOp.GetContainerSign(taskMsg, taskResBody)
if containerSign == nil || len(containerSign) == 0 { if containerSign == nil || len(containerSign) == 0 {
...@@ -334,7 +218,7 @@ func handlerMsg(nodeManager *models.NodeManagerClient, ...@@ -334,7 +218,7 @@ func handlerMsg(nodeManager *models.NodeManagerClient,
isSuccess = false isSuccess = false
} }
reqHash, respHash, minerSign := taskMsgWorker.GetMinerSign(taskMsg, taskResBody) reqHash, respHash, minerSign := taskMsgWorker.GetMinerSign(taskMsg, taskResBody)
params := buildParams(taskMsg.TaskUuid, containerSign, minerSign, taskResHeader, taskResBody, isSuccess) params := buildParams(taskMsg.TaskUuid, containerSign, minerSign, taskResHeader, taskResBody, taskResExecTime, isSuccess)
taskMsgWorker.LruCache.Add(taskMsg.TaskUuid+models.ContainerSign, containerSign) taskMsgWorker.LruCache.Add(taskMsg.TaskUuid+models.ContainerSign, containerSign)
taskMsgWorker.LruCache.Add(taskMsg.TaskUuid+models.MinerSign, minerSign) taskMsgWorker.LruCache.Add(taskMsg.TaskUuid+models.MinerSign, minerSign)
taskMsgWorker.LruCache.Add(taskMsg.TaskUuid+models.ReqHash, reqHash) taskMsgWorker.LruCache.Add(taskMsg.TaskUuid+models.ReqHash, reqHash)
...@@ -408,33 +292,3 @@ func buildParams(params ...interface{}) []interface{} { ...@@ -408,33 +292,3 @@ func buildParams(params ...interface{}) []interface{} {
} }
return res return res
} }
func inputNodeManagerChan(manager *NodeManager, nodeManagerClient *models.NodeManagerClient) bool {
if nodeManagerClient == nil {
nodeManagerClient = &models.NodeManagerClient{
PublicKey: manager.Info.Publickey,
Endpoint: manager.Info.Endpoint,
Status: true,
LastHeartTime: time.Now().UnixMilli(),
}
}
serviceClient := operate.ConnNmGrpc(manager.Info.Endpoint)
if serviceClient == nil {
return false
}
nodeManagerClient.Status = true
nodeManagerClient.Client = serviceClient
nodeManagerClientChan <- nodeManagerClient
usedNodeManagerClient = append(usedNodeManagerClient, nodeManagerClient)
manager.IsUsed = true
return true
}
func getNodeManager(endPoint string) *NodeManager {
for _, manager := range nodeManagerArr {
if manager.Info.Endpoint == endPoint {
return manager
}
}
return nil
}
...@@ -28,6 +28,7 @@ type TaskHandler struct { ...@@ -28,6 +28,7 @@ type TaskHandler struct {
CmdOp *operate.Command CmdOp *operate.Command
TaskMsg chan *nodeManagerV1.PushTaskMessage TaskMsg chan *nodeManagerV1.PushTaskMessage
TaskRespHeader map[string][]byte TaskRespHeader map[string][]byte
TaskExecTime map[string]int64
TaskRespBody map[string][]byte TaskRespBody map[string][]byte
TaskIsSuccess map[string]bool TaskIsSuccess map[string]bool
HttpClient *http.Client HttpClient *http.Client
...@@ -39,6 +40,7 @@ func NewTaskWorker(op *operate.DockerOp) *TaskHandler { ...@@ -39,6 +40,7 @@ func NewTaskWorker(op *operate.DockerOp) *TaskHandler {
LruCache: lru.New(100), LruCache: lru.New(100),
DockerOp: op, DockerOp: op,
TaskMsg: make(chan *nodeManagerV1.PushTaskMessage, 0), TaskMsg: make(chan *nodeManagerV1.PushTaskMessage, 0),
TaskExecTime: make(map[string]int64, 0),
TaskRespHeader: make(map[string][]byte, 0), TaskRespHeader: make(map[string][]byte, 0),
TaskRespBody: make(map[string][]byte, 0), TaskRespBody: make(map[string][]byte, 0),
TaskIsSuccess: make(map[string]bool, 0), TaskIsSuccess: make(map[string]bool, 0),
...@@ -85,6 +87,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) ...@@ -85,6 +87,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
defer t.Wg.Done() defer t.Wg.Done()
t.TaskRespBody[taskMsg.TaskUuid] = nil t.TaskRespBody[taskMsg.TaskUuid] = nil
t.TaskRespHeader[taskMsg.TaskUuid] = nil t.TaskRespHeader[taskMsg.TaskUuid] = nil
t.TaskExecTime[taskMsg.TaskUuid] = 0
t.TaskIsSuccess[taskMsg.TaskUuid] = false t.TaskIsSuccess[taskMsg.TaskUuid] = false
reader := bytes.NewReader(taskMsg.TaskParam) reader := bytes.NewReader(taskMsg.TaskParam)
taskCmd := &models.TaskCmd{} taskCmd := &models.TaskCmd{}
...@@ -150,11 +153,14 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) ...@@ -150,11 +153,14 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
log.Infof("Started container with ID %s", containerId) log.Infof("Started container with ID %s", containerId)
} }
startBeforeTaskTime := time.Now()
post, err := t.HttpClient.Post(taskCmd.ApiUrl, "application/json", reader) post, err := t.HttpClient.Post(taskCmd.ApiUrl, "application/json", reader)
if err != nil { if err != nil {
log.Error("Http client post error: ", err) log.Error("Http client post error: ", err)
return return
} }
endAfterTaskTime := time.Since(startBeforeTaskTime)
log.WithField("time", endAfterTaskTime.Seconds()).Info("Exec task end (second is units) :")
if post.StatusCode == http.StatusOK { if post.StatusCode == http.StatusOK {
headers, err := json.Marshal(post.Header) headers, err := json.Marshal(post.Header)
if err != nil { if err != nil {
...@@ -169,6 +175,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage) ...@@ -169,6 +175,7 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
t.TaskRespHeader[taskMsg.TaskUuid] = headers t.TaskRespHeader[taskMsg.TaskUuid] = headers
t.TaskRespBody[taskMsg.TaskUuid] = readBody t.TaskRespBody[taskMsg.TaskUuid] = readBody
t.TaskIsSuccess[taskMsg.TaskUuid] = true t.TaskIsSuccess[taskMsg.TaskUuid] = true
t.TaskExecTime[taskMsg.TaskUuid] = endAfterTaskTime.Microseconds()
} }
log.Info("received computeTask--------------------------------") log.Info("received computeTask--------------------------------")
} }
......
...@@ -125,7 +125,6 @@ func (d *DockerOp) ListContainer() []types.Container { ...@@ -125,7 +125,6 @@ func (d *DockerOp) ListContainer() []types.Container {
} }
func (d *DockerOp) CreateAndStartContainer(imageName string, dockerCmd *models.DockerCmd) (string, error) { func (d *DockerOp) CreateAndStartContainer(imageName string, dockerCmd *models.DockerCmd) (string, error) {
containerId, err := d.CreateContainer(imageName, dockerCmd) containerId, err := d.CreateContainer(imageName, dockerCmd)
if err != nil { if err != nil {
log.Error("Error creating container image failed: ", err) log.Error("Error creating container image failed: ", err)
...@@ -182,6 +181,14 @@ func (d *DockerOp) CreateContainer(imageName string, dockerCmd *models.DockerCmd ...@@ -182,6 +181,14 @@ func (d *DockerOp) CreateContainer(imageName string, dockerCmd *models.DockerCmd
}, &container.HostConfig{ }, &container.HostConfig{
PortBindings: portMap, PortBindings: portMap,
AutoRemove: true, // 容器停止后自动删除 AutoRemove: true, // 容器停止后自动删除
Resources: container.Resources{
DeviceRequests: []container.DeviceRequest{
{
Driver: "nvidia",
Count: -1, // -1 means all GPUs
},
},
},
}, nil, nil, "") }, nil, nil, "")
if err != nil { if err != nil {
...@@ -229,6 +236,7 @@ func (d *DockerOp) StopAndDeleteContainer(containerID string) bool { ...@@ -229,6 +236,7 @@ func (d *DockerOp) StopAndDeleteContainer(containerID string) bool {
log.Info("Container stopped successfully.") log.Info("Container stopped successfully.")
return true return true
} }
func (d *DockerOp) RmContainer(containerID string) { func (d *DockerOp) RmContainer(containerID string) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel() defer cancel()
......
package test
import (
"crypto/ecdsa"
"example.com/m/conf"
"example.com/m/utils"
"math/big"
"reflect"
"testing"
)
func TestGenerateRandomNumber(t *testing.T) {
type args struct {
privateKey *ecdsa.PrivateKey
length int64
}
tests := []struct {
name string
args args
want *big.Int
}{
// TODO: Add test cases.
{
"randomPrvTest",
args{
privateKey: conf.GetConfig().SignPrivateKey,
length: 2,
},
big.NewInt(1),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := utils.GenerateRandomNumber(tt.args.privateKey, tt.args.length); !reflect.DeepEqual(got, tt.want) {
t.Errorf("GenerateRandomNumber() = %v, want %v", got, tt.want)
}
})
}
}
package utils
import (
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"example.com/m/log"
"fmt"
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/ethereum/go-ethereum/crypto"
"io/ioutil"
"math/big"
"os"
)
const KEYPATH_PWD = "keystore"
func GenerateRandomNumber(privateKey *ecdsa.PrivateKey, length int64) *big.Int {
// 生成一个随机数
randNum, err := rand.Int(rand.Reader, elliptic.P256().Params().N)
if err != nil {
log.Error("Error generating random number:", err)
}
randNum.Mod(randNum, big.NewInt(length))
log.Info("Generating random number:", randNum)
return randNum
}
func GetPrv() (*ecdsa.PrivateKey, error) {
if _, err := os.Stat(KEYPATH_PWD); os.IsNotExist(err) {
log.Info("Keystore not found. Generating a new one...")
// 生成私钥
privateKey, err := generatePrivateKey()
if err != nil {
log.Error("Error generating private key:", err)
return nil, err
}
// 保存私钥到 keystore 文件
err = savePrivateKey(privateKey)
if err != nil {
log.Error("Error saving private key:", err)
return nil, err
}
log.Info("Keystore generated successfully.")
return privateKey, nil
} else {
log.Info("Keystore found. Reading private key...")
// 读取私钥
privateKey, err := readPrivateKey()
if err != nil || privateKey == nil {
log.Error("Error reading private key:", err)
return nil, err
}
log.Info("Private key read successfully:", privateKey)
return privateKey, nil
}
}
func generatePrivateKey() (*ecdsa.PrivateKey, error) {
return crypto.GenerateKey()
}
func savePrivateKey(privateKey *ecdsa.PrivateKey) error {
ks := keystore.NewKeyStore(KEYPATH_PWD, keystore.StandardScryptN, keystore.StandardScryptP)
account, err := ks.ImportECDSA(privateKey, KEYPATH_PWD)
if err != nil {
log.Fatal(err)
}
fmt.Println(account.Address.Hex())
return nil
}
func readPrivateKey() (*ecdsa.PrivateKey, error) {
file, err := ioutil.ReadDir(KEYPATH_PWD)
if err != nil {
return nil, err
}
for _, info := range file {
keystoreFile := fmt.Sprintf("%s%s%s", KEYPATH_PWD, "/", info.Name())
jsonBytes, err := ioutil.ReadFile(keystoreFile)
if err != nil {
log.Error("import ecdsa keystore error: ", err)
continue
}
key, err := keystore.DecryptKey(jsonBytes, KEYPATH_PWD)
if err != nil {
log.Error("keystore decrypt key failed:", err)
continue
}
return key.PrivateKey, nil
}
return nil, nil
}
...@@ -108,7 +108,7 @@ func (p *ProofWorker) ProofStorage() { ...@@ -108,7 +108,7 @@ func (p *ProofWorker) ProofStorage() {
} }
func (p *ProofWorker) CommitWitness() { func (p *ProofWorker) CommitWitness() {
validatorClient := operate.ConnValidatorGrpc("192.168.1.180:9431") validatorClient := operate.ConnValidatorGrpc("192.168.1.211:9431")
for { for {
select { select {
case proofs := <-p.consumeProofChan: case proofs := <-p.consumeProofChan:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment