Commit def8d4d2 authored by duanjinfei's avatar duanjinfei

change commit validator

parent 5b4b486b
......@@ -2,3 +2,4 @@
logs
*.DS_Store
*/mydb/
mydb
......@@ -22,7 +22,6 @@ type Config struct {
HeartRespTimeMillis int64
TaskValidatorTime float64 `json:"task_validator_time"`
BenefitAddress string `json:"benefit_address"`
DockerSignApi string `json:"docker_sign_api"`
ContainerNum int64 `json:"container_num"`
}
......@@ -36,7 +35,7 @@ func init() {
viper.SetConfigType("json")
// 设置配置文件所在的目录
viper.AddConfigPath("../")
viper.AddConfigPath("./")
// 读取配置文件
if err := viper.ReadInConfig(); err != nil {
......
......@@ -5,6 +5,5 @@
"benefit_address": "0x84A3175be614F5886f99Da506dF08682DF530739",
"heart_response": 30,
"task_validator_time": 1,
"docker_sign_api":"http://192.168.1.120:8888/llm/test/get/sign",
"container_num": 1
}
\ No newline at end of file
......@@ -17,12 +17,23 @@ func init() {
if err != nil {
log.Error("Leveldb open file failed: ", err)
}
defer func(dbInstance *leveldb.DB) {
err := dbInstance.Close()
if err != nil {
log.Error("Leveldb close file failed: ", err)
// 遍历数据库,删除所有数据
iter := dbInstance.NewIterator(nil, nil)
for iter.Next() {
key := iter.Key()
// 删除 key 对应的数据
if err := dbInstance.Delete(key, nil); err != nil {
log.Error("Leveldb delete failed: ", err)
}
}
}(dbInstance)
iter.Release()
//defer func(dbInstance *leveldb.DB) {
// err := dbInstance.Close()
// if err != nil {
// log.Error("Leveldb close file failed: ", err)
// }
//}(dbInstance)
}
func Put(key string, value []byte) error {
......
......@@ -19,6 +19,8 @@ type DockerCmd struct {
type TaskReq struct {
TaskId string `json:"task_id"`
TaskParam []byte `json:"task_param"`
TaskResult []byte `json:"task_result"`
}
type ContainerSignStruct struct {
......
MANIFEST-000063
MANIFEST-000026
MANIFEST-000061
MANIFEST-000024
This diff is collapsed.
......@@ -35,6 +35,7 @@ func (o *RespMsgWorker) RegisterMsgResp(nodeManager *models.NodeManagerClient, w
handler: handler,
params: params,
}
log.Info("---------------------------------------Send register msg ------------------------------------")
}
func (o *RespMsgWorker) SendMsg() {
......@@ -65,6 +66,7 @@ func HeartbeatResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
},
},
}
log.Info("---------------------------------------Send heart beat msg ------------------------------------")
return heartRes
}
......@@ -83,6 +85,7 @@ func SubmitResourceMapRes(params ...interface{}) *nodemanagerV1.WorkerMessage {
},
},
}
log.Info("---------------------------------------Send resource map msg ------------------------------------")
return heartRes
}
......@@ -137,6 +140,7 @@ func DeviceInfoResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
},
},
}
log.Info("---------------------------------------Send device info msg ------------------------------------")
return deviceInfoRes
}
......@@ -147,6 +151,7 @@ func DeviceUsageResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
DeviceUsage: &nodemanagerV1.DeviceUsageResponse{},
},
}
log.Info("---------------------------------------Send device usage msg ------------------------------------")
return deviceInfoRes
}
......@@ -159,6 +164,7 @@ func StatusResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
},
},
}
log.Info("---------------------------------------Send device status msg ------------------------------------")
return statusRes
}
......@@ -175,6 +181,7 @@ func GoodbyeResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
},
},
}
log.Info("---------------------------------------Send good bye msg ------------------------------------")
return goodbyeMsgRes
}
......@@ -196,5 +203,6 @@ func SubmitResultResp(params ...interface{}) *nodemanagerV1.WorkerMessage {
},
},
}
log.Info("---------------------------------------Send task result msg ------------------------------------")
return submitResultMsgRes
}
......@@ -2,6 +2,7 @@ package nm
import (
"context"
"example.com/m/conf"
"example.com/m/log"
"example.com/m/models"
"example.com/m/operate"
......@@ -29,25 +30,25 @@ func init() {
nodeManagerIsDel = make(map[string]bool, 0)
nodeManagerChan = make(chan *models.NodeManagerClient, 0)
nodeManagerMsgChan = make(chan *nodeManagerV1.ManagerMessage, 0)
//seed := conf.GetConfig().NmSeed
//log.Info("Nm seed url:", seed)
//seedServiceClient := operate.ConnNmGrpc(seed)
//if seedServiceClient == nil {
// panic("Dial nm seed service client failed")
//}
//ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
//defer cancel()
//list, err := seedServiceClient.ManagerList(ctx, &nodeManagerV1.ManagerListRequest{}, grpc.EmptyCallOption{})
//if err != nil {
// panic(fmt.Sprintf("Get manager list failed : %s", err.Error()))
//}
//if list.GetManagers() == nil || len(list.GetManagers()) == 0 {
// panic("Get manager list failed,the manager list is nil")
//}
//for _, node := range list.GetManagers() {
// nodeManagers = append(nodeManagers, node)
// nodeManagerIsDel[node.Publickey] = false
//}
seed := conf.GetConfig().NmSeed
log.Info("Nm seed url:", seed)
seedServiceClient := operate.ConnNmGrpc(seed)
if seedServiceClient == nil {
panic("Dial nm seed service client failed")
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
list, err := seedServiceClient.ManagerList(ctx, &nodeManagerV1.ManagerListRequest{}, grpc.EmptyCallOption{})
if err != nil {
panic(fmt.Sprintf("Get manager list failed : %s", err.Error()))
}
if list.GetManagers() == nil || len(list.GetManagers()) == 0 {
panic("Get manager list failed,the manager list is nil")
}
for _, node := range list.GetManagers() {
nodeManagers = append(nodeManagers, node)
nodeManagerIsDel[node.Publickey] = false
}
}
func StartMonitor() {
......@@ -57,10 +58,10 @@ func StartMonitor() {
panic("Docker client is not healthy")
}
go monitorModelInfo(dockerOp)
go monitorWorker(dockerOp)
go monitorModelInfo(dockerOp)
for _, manager := range nodeManagers {
// TODO: 需要对索引进行一定的规则判断,随机选择其中的nodeManager进行链接
isSuccess := inputNodeManagerChan(manager)
......@@ -128,11 +129,14 @@ func monitorWorker(op *operate.DockerOp) {
proofWorker := validator.NewProofWorker()
// 上报image信息
go reportImageInfo(nodeManager, worker, msgRespWorker, op)
go reportModelInfo(nodeManager, worker, msgRespWorker, op)
// 证明上报
// 证明存储
go proofWorker.ProofStorage()
// 证明提交
go proofWorker.CommitWitness()
// 处理其他消息
go handlerMsg(nodeManager, worker, msgRespWorker, taskMsgWorker, proofWorker)
......@@ -172,7 +176,7 @@ func monitorModelInfo(dockerOp *operate.DockerOp) {
Pwd: "",
Repository: "",
SignUrl: "http://192.168.1.120:8888/llm/test/get/sign",
ImageName: "llm-server:test",
ImageName: "onlydd/llm-server:0119",
DiskSize: 10000,
MemorySize: 10000,
IsImageExist: false,
......@@ -188,22 +192,25 @@ func monitorModelInfo(dockerOp *operate.DockerOp) {
// todo: 如果够用
if isPull {
go dockerOp.PullImage(modelInfo)
dockerOp.SignApi[modelInfo.ImageName] = modelInfo.SignUrl
modelInfo.IsImageExist = true
dockerOp.ModelTaskIdChan <- modelInfo.TaskId
}
} else if !dockerOp.IsReportModelTaskId[modelInfo.TaskId] {
dockerOp.ModelTaskIdChan <- modelInfo.TaskId
dockerOp.IsReportModelTaskId[modelInfo.TaskId] = true
}
dockerOp.SignApi[modelInfo.ImageName] = modelInfo.SignUrl
dockerOp.ModelsInfo = append(dockerOp.ModelsInfo, modelInfo)
}
func reportImageInfo(nodeManager *models.NodeManagerClient,
func reportModelInfo(nodeManager *models.NodeManagerClient,
worker nodeManagerV1.NodeManagerService_RegisterWorkerClient,
msgRespWorker *RespMsgWorker, dockerOp *operate.DockerOp) {
for {
select {
case taskId := <-dockerOp.ModelTaskIdChan:
params := buildParams(taskId)
msgRespWorker.RegisterMsgResp(nodeManager, worker, SubmitResultResp, params)
msgRespWorker.RegisterMsgResp(nodeManager, worker, SubmitResourceMapRes, params)
}
}
}
......@@ -241,7 +248,7 @@ func handlerMsg(nodeManager *models.NodeManagerClient,
taskMsgWorker.Wg.Wait()
taskResBytes := taskMsgWorker.TaskResp[taskMsg.TaskUuid]
isSuccess := taskMsgWorker.TaskIsSuccess[taskMsg.TaskUuid]
containerSign := taskMsgWorker.DockerOp.GetContainerSign(taskMsg)
containerSign := taskMsgWorker.DockerOp.GetContainerSign(taskMsg, taskResBytes)
if containerSign == nil || len(containerSign) == 0 {
log.Error("Container signing failed................")
isSuccess = false
......
......@@ -2,14 +2,13 @@ package nm
import (
"bytes"
"crypto/ecdsa"
cryptoRand "crypto/rand"
"encoding/json"
"example.com/m/conf"
"example.com/m/log"
"example.com/m/models"
"example.com/m/operate"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/golang/groupcache/lru"
baseV1 "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
......@@ -113,11 +112,18 @@ func (t *TaskHandler) ComputeTaskHandler(taskMsg *nodeManagerV1.PushTaskMessage)
return
}
imageId := ""
isFound := false
for _, image := range images {
if image.RepoTags[0] == taskCmd.ImageName {
if isFound {
break
}
for _, tag := range image.RepoTags {
if tag == taskCmd.ImageName {
imageId = image.ID
isFound = true
break
}
}
log.Println(image.ID)
}
......@@ -185,9 +191,10 @@ func (t *TaskHandler) GetMinerSign(msg *nodeManagerV1.PushTaskMessage, taskResul
reqHash := crypto.Keccak256Hash(msg.TaskParam)
respHash := crypto.Keccak256Hash(taskResult)
signHash := crypto.Keccak256Hash(bytes.NewBufferString(msg.TaskUuid).Bytes(), reqHash.Bytes(), respHash.Bytes())
sign, err := ecdsa.SignASN1(cryptoRand.Reader, conf.GetConfig().SignPrivateKey, signHash.Bytes())
sign, err := crypto.Sign(signHash.Bytes(), conf.GetConfig().SignPrivateKey)
if err != nil {
log.Error("custom task handler")
}
log.Info("Miner sign:", common.Bytes2Hex(sign))
return reqHash.Bytes(), respHash.Bytes(), sign
}
......@@ -10,7 +10,6 @@ import (
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy"
"github.com/docker/go-connections/nat"
"github.com/ethereum/go-ethereum/common"
nodemanagerv1 "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
......@@ -29,6 +28,7 @@ type DockerOp struct {
UsedExternalPort map[int64]bool
SignApi map[string]string
ModelsInfo []*models.ModelInfo
IsReportModelTaskId map[uint64]bool
ModelTaskIdChan chan uint64
}
......@@ -50,14 +50,17 @@ func NewDockerOp() *DockerOp {
dockerClient: dockerClient,
SignApi: make(map[string]string, 0),
ModelsInfo: make([]*models.ModelInfo, 0),
IsReportModelTaskId: make(map[uint64]bool, 0),
UsedExternalPort: make(map[int64]bool, 0),
ModelTaskIdChan: make(chan uint64, 0),
}
}
func (d *DockerOp) GetContainerSign(taskMsg *nodemanagerv1.PushTaskMessage) []byte {
func (d *DockerOp) GetContainerSign(taskMsg *nodemanagerv1.PushTaskMessage, taskRes []byte) []byte {
reqBody := &models.TaskReq{
TaskId: taskMsg.TaskUuid,
TaskParam: taskMsg.TaskParam,
TaskResult: taskRes,
}
body, err := json.Marshal(reqBody)
if err != nil {
......@@ -124,8 +127,7 @@ func (d *DockerOp) ListContainer() []types.Container {
}
func (d *DockerOp) CreateAndStartContainer(imageName string, dockerCmd *models.DockerCmd) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()
containerId, err := d.CreateContainer(imageName, dockerCmd)
if err != nil {
log.Error("Error creating container image failed: ", err)
......@@ -138,26 +140,28 @@ func (d *DockerOp) CreateAndStartContainer(imageName string, dockerCmd *models.D
log.Error("start container failed:", startContainerIsSuccess)
}
statusCh, errCh := d.dockerClient.ContainerWait(ctx, containerId, container.WaitConditionNotRunning)
select {
case err := <-errCh:
if err != nil {
panic(err)
}
case <-statusCh:
break
}
out, err := d.dockerClient.ContainerLogs(ctx, containerId, types.ContainerLogsOptions{ShowStdout: true})
if err != nil {
panic(err)
}
_, err = stdcopy.StdCopy(os.Stdout, os.Stderr, out)
if err != nil {
log.Error("std out put failed:", err)
return "", err
}
//ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
//defer cancel()
//statusCh, errCh := d.dockerClient.ContainerWait(ctx, containerId, container.WaitConditionNotRunning)
//select {
//case err := <-errCh:
// if err != nil {
// panic(err)
// }
//case <-statusCh:
// break
//}
//
//out, err := d.dockerClient.ContainerLogs(ctx, containerId, types.ContainerLogsOptions{ShowStdout: true})
//if err != nil {
// panic(err)
//}
//
//_, err = stdcopy.StdCopy(os.Stdout, os.Stderr, out)
//if err != nil {
// log.Error("std out put failed:", err)
// return "", err
//}
return containerId, nil
}
......@@ -259,7 +263,9 @@ func (d *DockerOp) PsImageNameMap() (map[string]bool, error) {
}
res := make(map[string]bool, 0)
for _, image := range images {
res[image.RepoTags[0]] = true
for _, tag := range image.RepoTags {
res[tag] = true
}
}
return res, nil
}
......
......@@ -14,6 +14,36 @@ import (
"testing"
)
func TestJson(t *testing.T) {
type DockerCmd struct {
ContainerPort string `json:"container_port"`
HostIp string
HostPort string
}
type TaskCmd struct {
ImageName string `json:"image_name"`
DockerCmd *DockerCmd `json:"docker_cmd"`
ApiUrl string `json:"api_url"`
}
taskCmd := &TaskCmd{
ImageName: "onlydd/llm-server:0119",
DockerCmd: &DockerCmd{
ContainerPort: "80",
},
ApiUrl: "https://192.168.1.120:5001/aigic",
}
marshal, err := json.Marshal(taskCmd)
if err != nil {
_ = fmt.Errorf("error marshalling task cmd: %s", err.Error())
return
}
fmt.Println("marshal:", string(marshal))
}
func TestTaskHandler_computeTaskHandler(t1 *testing.T) {
type fields struct {
wg *sync.WaitGroup
......
......@@ -14,7 +14,6 @@ import (
)
type ProofWorker struct {
lastCommitTime time.Time
productProofChan chan *witnessV1.Proof
consumeProofChan chan []*witnessV1.Proof
isCommitProof map[string]bool
......@@ -22,9 +21,9 @@ type ProofWorker struct {
func NewProofWorker() *ProofWorker {
return &ProofWorker{
lastCommitTime: time.Now(),
productProofChan: make(chan *witnessV1.Proof, 0),
consumeProofChan: make(chan []*witnessV1.Proof, 0),
isCommitProof: make(map[string]bool, 0),
}
}
......@@ -41,9 +40,6 @@ func (p *ProofWorker) ProductProof(taskId string, workLoad uint64, reqHash []byt
}
func (p *ProofWorker) ProofStorage() {
// 定义区间
min := 40
max := 59
go func(productProofChan chan *witnessV1.Proof) {
for {
select {
......@@ -64,23 +60,27 @@ func (p *ProofWorker) ProofStorage() {
}
}
}(p.productProofChan)
// todo: 需要修改为 Minute
timer := time.NewTicker(time.Minute)
defer timer.Stop()
randomMinute := getRandInt()
for {
// todo: 每个小时的提交时间,应该随机
since := time.Since(p.lastCommitTime)
if since.Hours() == conf.GetConfig().TaskValidatorTime {
nowTime := time.Now()
rand.Seed(nowTime.UnixNano())
// 生成在 [min, max] 范围内的随机整数
randomNumber := rand.Intn(max-min+1) + min
if nowTime.Minute() > min && nowTime.Minute() < randomNumber {
select {
case <-timer.C:
min := time.Now().Minute()
// 检查是否在指定时间范围内(40-59分钟)
if min >= 40 && min <= 59 && min == randomMinute {
randomMinute = getRandInt()
proofs := make([]*witnessV1.Proof, 0)
// TODO: 取出数据并且消费
iter, err := db.NewIterator()
if err != nil {
log.Error("db new iterator failed: ", err)
continue
}
// todo: 数据堆积越多,可能循环的次数越多,对性能有影响
if iter == nil {
log.Warn("level db iterator is nil")
continue
}
for iter.Next() {
proof := &witnessV1.Proof{}
err := json.Unmarshal(iter.Value(), proof)
......@@ -99,15 +99,16 @@ func (p *ProofWorker) ProofStorage() {
// return
//}
}
p.lastCommitTime = nowTime
if len(proofs) > 0 {
p.consumeProofChan <- proofs
}
}
}
}
}
func (p *ProofWorker) CommitWitness() {
validatorClient := operate.ConnValidatorGrpc("")
validatorClient := operate.ConnValidatorGrpc("192.168.1.180:9431")
for {
select {
case proofs := <-p.consumeProofChan:
......@@ -126,3 +127,7 @@ func (p *ProofWorker) CommitWitness() {
}
}
}
func getRandInt() int {
return rand.Intn(20) + 40
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment