Commit 024fdef8 authored by vicotor's avatar vicotor

add more msg handler

parent f3f5c792
......@@ -28,7 +28,7 @@ func (wm *WorkerManager) taskResult(worker *Worker, task *odysseus.TaskContent,
}
func (wm *WorkerManager) computeTaskResult(worker *Worker, task *odysseus.TaskContent, result *omanager.SubmitTaskResult) (*odysseus.TaskProof, error) {
if worker.info.nodeInfo == nil {
if worker.info == nil {
return nil, errors.New("unknown worker node info")
}
//log.WithFields(log.Fields{
......@@ -89,7 +89,7 @@ func (wm *WorkerManager) computeTaskResult(worker *Worker, task *odysseus.TaskCo
// verify miner_signature
// miner_signature = sign(hash((task_id+hash(task_param)+hash(task_result)))
verified := utils.VerifySignature(utils.CombineBytes([]byte(result.TaskId), paramHash[:], resultHash[:]),
result.MinerSignature, utils.FromHex(worker.info.nodeInfo.Info.MinerPubkey))
result.MinerSignature, utils.FromHex(worker.info.Info.MinerPubkey))
log.WithFields(log.Fields{
"task-id": task.TaskId,
"minerSignatureVerify": verified,
......@@ -131,7 +131,7 @@ func (wm *WorkerManager) computeTaskResult(worker *Worker, task *odysseus.TaskCo
}
func (wm *WorkerManager) standardTaskResult(worker *Worker, task *odysseus.TaskContent, result *omanager.SubmitTaskResult) (*odysseus.TaskProof, error) {
if worker.info.nodeInfo == nil {
if worker.info == nil {
return nil, errors.New("unknown worker node info")
}
//log.WithFields(log.Fields{
......@@ -181,7 +181,7 @@ func (wm *WorkerManager) standardTaskResult(worker *Worker, task *odysseus.TaskC
// verify miner_signature
// miner_signature = sign(hash((task_id+hash(task_param)+hash(task_result)))
verified := utils.VerifySignature(utils.CombineBytes([]byte(result.TaskId), paramHash[:], resultHash[:]),
result.MinerSignature, utils.FromHex(worker.info.nodeInfo.Info.MinerPubkey))
result.MinerSignature, utils.FromHex(worker.info.Info.MinerPubkey))
log.WithFields(log.Fields{
"task-id": task.TaskId,
"minerSignatureVerify": verified,
......
package server
import (
"bytes"
"context"
"errors"
"github.com/ethereum/go-ethereum/common"
"github.com/google/uuid"
lru "github.com/hashicorp/golang-lru"
"github.com/odysseus/mogo/operator"
"github.com/odysseus/mogo/types"
"github.com/odysseus/nodemanager/standardlib"
odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
log "github.com/sirupsen/logrus"
"golang.org/x/crypto/sha3"
"strconv"
"time"
)
type workerInfo struct {
nodeInfo *omanager.NodeInfoResponse
}
type workerUsageInfo struct {
hwUsage *omanager.HardwareUsage
}
......@@ -22,6 +27,7 @@ type sendMsgCallback struct {
callback func(err error) bool
}
type Worker struct {
wm *WorkerManager
quit chan interface{}
taskCh chan *dispatchTask
msgCh chan *omanager.WorkerMessage
......@@ -29,27 +35,32 @@ type Worker struct {
resultCh chan *omanager.SubmitTaskResult
uuid int64 // worker uuid in the local.
heartBeat int64
registed bool // worker is registed to this nm.
online bool
disconnect bool
nonce int
latestNmValue string
addFirstSucceed bool
lastFreeGpu *omanager.GPUInfo
info workerInfo
info *omanager.NodeInfoResponse
usage workerUsageInfo
workerAddr string // worker address from public-key
deviceInfoHash []byte
recentTask *lru.ARCCache
status string
errCh chan error
infoOp *operator.WorkerInfoOperator
installOp *operator.WorkerInstalledOperator
runningOp *operator.WorkerRunningOperator
stream omanager.NodeManagerService_RegisterWorkerServer
}
func (w *Worker) ProfitAccount() common.Address {
if w.info.nodeInfo != nil {
return common.HexToAddress(w.info.nodeInfo.Info.BenefitAddress)
if w.info != nil {
return common.HexToAddress(w.info.Info.BenefitAddress)
}
return common.Address{}
}
......@@ -123,6 +134,339 @@ func (w *Worker) RecvMessage() {
}
}
func (w *Worker) doGoodBye(msg *omanager.WorkerMessage_GoodbyeMessage) {
w.online = false
w.quit <- msg.GoodbyeMessage.Reason
close(w.taskCh)
}
func (w *Worker) doSubmitAck(msg *omanager.WorkerMessage_SubmitTaskAck) {
if v, ok := w.recentTask.Get(msg.SubmitTaskAck.TaskId); ok {
dtask := v.(*dispatchTask)
dtask.setAck(msg.SubmitTaskAck)
}
}
func (w *Worker) doSubmitResult(msg *omanager.WorkerMessage_SubmitTaskResult) {
w.resultCh <- msg.SubmitTaskResult
}
func (w *Worker) doHeartBeat(msg *omanager.WorkerMessage_HeartbeatResponse) {
w.online = true
w.heartBeat = time.Now().Unix()
log.WithFields(log.Fields{
"worker-addr": w.workerAddr,
"TTL": time.Now().Unix() - int64(msg.HeartbeatResponse.Timestamp),
}).Debug("receive worker heartbeat")
}
func (w *Worker) doUpdateBenefit(msg *omanager.WorkerMessage_BenefitAddrUpdate) {
if w.info != nil {
w.info.Info.BenefitAddress = msg.BenefitAddrUpdate.BenefitAddress
}
// update to mogo.
w.infoOp.UpdateBenefitAddr(context.TODO(), w.workerAddr, msg.BenefitAddrUpdate.BenefitAddress)
}
func (w *Worker) doGetNodeInfo(msg *omanager.WorkerMessage_NodeInfo) {
if w.info == nil {
w.info = &omanager.NodeInfoResponse{
Info: msg.NodeInfo.Info,
Hardware: msg.NodeInfo.Hardware,
Models: msg.NodeInfo.Models,
}
} else {
// todo: do update ?
}
}
func (w *Worker) doFetchStdTask(msg *omanager.WorkerMessage_FetchStandardTask) {
if w.info == nil {
return
}
log.WithFields(log.Fields{
"worker-addr": w.workerAddr,
}).Debugf("receive worker fetch std task request:%v", msg.FetchStandardTask.TaskType)
pushTask := standardlib.StdTask{}
task, exist := w.wm.std.GetTask(msg.FetchStandardTask.TaskType)
if exist {
stdlib := w.wm.std.GetStdLib(task.TaskType)
if stdlib == nil {
log.WithField("task-type", task.TaskType).Warn("not found std lib")
return
}
pushTask = task
pushTask.TaskId = uuid.NewString()
param, err := stdlib.GenerateParam(0)
if err != nil {
log.WithError(err).WithField("task-type", task.TaskType).Error("generate param failed")
return
}
pushTask.TaskParam = []byte(param)
pushTask.TaskInLen = int32(len(param))
pushTask.TaskUid = "0"
pushTask.TaskTimestamp = uint64(time.Now().UnixNano())
pushTask.TaskKind = odysseus.TaskKind_StandardTask
pushTask.TaskFee = "0"
dtask := newDispatchTask(w, &pushTask.TaskContent)
w.taskCh <- dtask
} else {
log.WithField("task-type", msg.FetchStandardTask.TaskType).Warn("not found std task")
}
}
func (w *Worker) doGetDeviceInfo(msg *omanager.WorkerMessage_DeviceInfo) {
if w.info == nil {
return
}
log.WithFields(log.Fields{
"worker-addr": w.workerAddr,
}).Debugf("receive worker device info:%v", msg.DeviceInfo)
{
var infoHash [32]byte
infoHash = sha3.Sum256([]byte(msg.DeviceInfo.String()))
if bytes.Compare(infoHash[:], w.deviceInfoHash) != 0 && w.registed {
// check device info changed, and update to mogo.
w.infoOp.UpdateHardware(context.TODO(), w.workerAddr, types.PbToHardwareInfo(msg.DeviceInfo.Hardware))
}
// update local cache.
w.deviceInfoHash = infoHash[:]
w.info.Hardware = msg.DeviceInfo.Hardware
}
}
func (w *Worker) doDeviceUsage(msg *omanager.WorkerMessage_DeviceUsage) error {
w.usage.hwUsage = msg.DeviceUsage.Usage
log.WithFields(log.Fields{
"worker-addr": w.workerAddr,
}).Debugf("receive worker device usage:%v", msg.DeviceUsage.Usage)
if !w.registed {
return nil
}
// 1. update usage to hardware mogo.
return w.infoOp.UpdateHardwareUsage(context.TODO(), w.workerAddr, types.PbToDeviceUsage(msg.DeviceUsage.Usage))
}
func (w *Worker) doAddRunningModel(msg *omanager.WorkerMessage_AddModelRunning) {
if !w.registed {
return
}
models := make([]*types.RunningModel, 0)
log.WithFields(log.Fields{
"worker-addr": w.workerAddr,
"count": len(msg.AddModelRunning.Models),
}).Debugf("receive worker add running model:%v", msg.AddModelRunning.Models)
for _, model := range msg.AddModelRunning.Models {
models = append(models, types.PbToRunningModel(model))
}
// 1. update model to worker info mogo.
err := w.infoOp.AddRunningModels(context.TODO(), w.workerAddr, models)
if err != nil {
log.WithError(err).Error("add worker running model failed")
return
}
// 2. add info to worker running model.
running := make([]*operator.WorkerRunningInfo, len(models))
for i, model := range models {
id, _ := strconv.Atoi(model.ModelID)
running[i] = &operator.WorkerRunningInfo{
WorkerId: w.workerAddr,
ModelId: id,
ExecTime: model.ExecTime,
}
}
_, err = w.runningOp.InsertMany(context.TODO(), running)
if err != nil {
log.WithError(err).Error("add worker running info failed")
}
}
func (w *Worker) doRemoveRunningModel(msg *omanager.WorkerMessage_DelModeRunning) {
if !w.registed {
return
}
models := make([]int, 0)
log.WithFields(log.Fields{
"worker-addr": w.workerAddr,
"count": len(msg.DelModeRunning.ModelIds),
}).Debugf("receive worker remove running model:%v", msg.DelModeRunning.ModelIds)
for _, model := range msg.DelModeRunning.ModelIds {
id, _ := strconv.Atoi(model)
models = append(models, id)
}
// 1. remove model from worker info mogo.
err := w.infoOp.DeleteRunningModels(context.TODO(), w.workerAddr, models)
if err != nil {
log.WithError(err).Error("remove worker running model failed")
return
}
// 2. remove info from worker running model.
_, err = w.runningOp.DeleteMany(context.TODO(), w.workerAddr, models)
if err != nil {
log.WithError(err).Error("remove worker running info failed")
}
}
func (w *Worker) doAddInstalledModel(msg *omanager.WorkerMessage_AddModelInstalled) {
if !w.registed {
return
}
models := make([]*types.InstalledModel, 0)
log.WithFields(log.Fields{
"worker-addr": w.workerAddr,
"count": len(msg.AddModelInstalled.Models),
}).Debugf("receive worker add installed model:%v", msg.AddModelInstalled.Models)
for _, model := range msg.AddModelInstalled.Models {
models = append(models, types.PbToInstalledModel(model))
}
// 1. update model to worker info mogo.
err := w.infoOp.AddInstalledModels(context.TODO(), w.workerAddr, models)
if err != nil {
log.WithError(err).Error("add worker installed model failed")
return
}
maxGpuFree := w.info.Hardware.GPU[0]
for _, gpu := range w.info.Hardware.GPU {
if gpu.MemFree > maxGpuFree.MemFree {
maxGpuFree = gpu
}
}
// 2. add info to worker installed model.
installed := make([]*operator.WorkerInstalledInfo, len(models))
for i, model := range models {
id, _ := strconv.Atoi(model.ModelID)
installed[i] = &operator.WorkerInstalledInfo{
WorkerId: w.workerAddr,
ModelId: id,
GpuFree: maxGpuFree.MemFree,
GpuSeq: int(maxGpuFree.Seq),
}
}
_, err = w.installOp.InsertMany(context.TODO(), installed)
if err != nil {
log.WithError(err).Error("add worker installed info failed")
}
}
func (w *Worker) doRemoveInstalledModel(msg *omanager.WorkerMessage_DelModelInstalled) {
//if !w.registed {
// return
//}
models := make([]int, 0)
log.WithFields(log.Fields{
"worker-addr": w.workerAddr,
"count": len(msg.DelModelInstalled.ModelIds),
}).Debugf("receive worker remove installed model:%v", msg.DelModelInstalled.ModelIds)
for _, model := range msg.DelModelInstalled.ModelIds {
id, _ := strconv.Atoi(model)
models = append(models, id)
}
// 1. remove model from worker info mogo.
err := w.infoOp.DeleteInstalledModels(context.TODO(), w.workerAddr, models)
if err != nil {
log.WithError(err).Error("remove worker installed model failed")
return
}
// 2. remove info from worker installed model.
_, err = w.installOp.DeleteMany(context.TODO(), w.workerAddr, models)
if err != nil {
log.WithError(err).Error("remove worker installed info failed")
}
}
func (w *Worker) doInstalledModelStatus(msg *omanager.WorkerMessage_InstalledModelStatus) {
//if !w.registed {
// return
//}
log.WithFields(log.Fields{
"worker-addr": w.workerAddr,
}).Debugf("receive worker installed model status:%v", msg.InstalledModelStatus)
// 1. update model status to worker info mogo.
id, _ := strconv.Atoi(msg.InstalledModelStatus.ModelId)
err := w.infoOp.UpdateInstalledModelStatus(context.TODO(), w.workerAddr, id, msg.InstalledModelStatus.LastRunTime)
if err != nil {
log.WithError(err).Error("update worker installed model status failed")
}
}
func (w *Worker) doRunningModelStatus(msg *omanager.WorkerMessage_RunningModelStatus) {
//if !w.registed {
// return
//}
log.WithFields(log.Fields{
"worker-addr": w.workerAddr,
}).Debugf("receive worker running model status:%v", msg.RunningModelStatus)
// 1. update model status to worker info mogo.
id, _ := strconv.Atoi(msg.RunningModelStatus.ModelId)
err := w.infoOp.UpdateRunningModelStatus(context.TODO(), w.workerAddr, id, msg.RunningModelStatus.LastWorkTime,
int64(msg.RunningModelStatus.TotalRunCount), int(msg.RunningModelStatus.ExecTime))
if err != nil {
log.WithError(err).Error("update worker running model status failed")
}
}
func (w *Worker) doGPUUsage(msg *omanager.WorkerMessage_GpuUsage) {
if w.usage.hwUsage == nil {
return
}
log.WithFields(log.Fields{
"worker-addr": w.workerAddr,
}).Debugf("receive worker gpu usage:%v", msg.GpuUsage)
// 1. update gpu usage to hardware mogo.
gpuusages := make([]types.GpuUsage, 0)
for _, gpu := range msg.GpuUsage.Usages {
gpuusages = append(gpuusages, types.PbToGpuUsage(gpu))
}
err := w.infoOp.UpdateGPUUsage(context.TODO(), w.workerAddr, gpuusages)
if err != nil {
log.WithError(err).Error("update worker gpu usage failed")
}
// 2. update gpu usage to worker usage.
for _, usage := range msg.GpuUsage.Usages {
for _, gpu := range w.info.Hardware.GPU {
if gpu.Seq == usage.Seq {
gpu.Usage = usage.Usage
gpu.MemFree = usage.MemFree
gpu.PowerRt = usage.PowerRt
gpu.Temp = usage.Temp
}
}
}
// 3. get max gpu free.
maxFree := w.getMaxGPUFree()
if w.lastFreeGpu != nil && maxFree.Seq == w.lastFreeGpu.Seq && maxFree.MemFree == w.lastFreeGpu.MemFree {
// no need update
return
} else {
w.installOp.UpdateGpuFree(context.TODO(), w.workerAddr, int64(maxFree.MemFree), int(maxFree.Seq))
w.lastFreeGpu = maxFree
}
}
func (w *Worker) getMaxGPUFree() *omanager.GPUInfo {
maxGpuFree := w.info.Hardware.GPU[0]
for _, gpu := range w.info.Hardware.GPU {
if gpu.MemFree > maxGpuFree.MemFree {
maxGpuFree = gpu
}
}
return maxGpuFree
}
func (w *Worker) ModelOperate(info interface{}, operate omanager.ModelOperateType) *omanager.ManagerMessage_ModelOperateRequest {
request := &omanager.ManagerMessage_ModelOperateRequest{
ModelOperateRequest: &omanager.ModelOperateRequest{
......@@ -139,5 +483,4 @@ func (w *Worker) ModelOperate(info interface{}, operate omanager.ModelOperateTyp
},
}
return request
}
package server
import (
"bytes"
"encoding/hex"
"errors"
"fmt"
"github.com/golang/protobuf/proto"
"github.com/google/uuid"
lru "github.com/hashicorp/golang-lru"
"github.com/odysseus/cache/cachedata"
"github.com/odysseus/mogo/operator"
......@@ -19,7 +17,6 @@ import (
"github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"
"go.mongodb.org/mongo-driver/mongo"
"golang.org/x/crypto/sha3"
"math/big"
"regexp"
"strconv"
......@@ -96,18 +93,6 @@ func (wm *WorkerManager) Stop() {
close(wm.quit)
}
func (wm *WorkerManager) UpdateHeartBeat(uuid int64) {
wm.hbRwLock.Lock()
defer wm.hbRwLock.Unlock()
wm.heartBeat[uuid] = time.Now().Unix()
}
func (wm *WorkerManager) GetHeartBeat(uuid int64) int64 {
wm.hbRwLock.RLock()
defer wm.hbRwLock.RUnlock()
return wm.heartBeat[uuid]
}
func (wm *WorkerManager) GetWorker(uuid int64) *Worker {
wm.wkRwLock.RLock()
defer wm.wkRwLock.RUnlock()
......@@ -160,6 +145,7 @@ func (wm *WorkerManager) AddNewWorker(id int64, worker omanager.NodeManagerServi
return nil, ErrWorkerExist
}
w := &Worker{
wm: wm,
quit: make(chan interface{}),
errCh: make(chan error, 1),
taskCh: make(chan *dispatchTask),
......@@ -171,11 +157,14 @@ func (wm *WorkerManager) AddNewWorker(id int64, worker omanager.NodeManagerServi
registed: false,
online: false,
info: workerInfo{},
info: nil,
workerAddr: "",
deviceInfoHash: nil,
status: "",
stream: worker,
infoOp: wm.workerInfoOperator,
installOp: wm.workerInstalledOperator,
runningOp: wm.workerRunningOperator,
}
taskCache, err := lru.NewARC(100)
......@@ -283,7 +272,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
return nil
case <-workerCheckTicker.C:
if worker.info.nodeInfo != nil {
if worker.info != nil {
nodeinfoTicker.Reset(time.Minute * 30)
}
......@@ -435,7 +424,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
return
case <-workerCheckTicker.C:
if time.Now().Unix()-wm.GetHeartBeat(worker.uuid) > int64(checkDuration) {
if time.Now().Unix()-worker.heartBeat > int64(checkDuration) {
log.WithField("worker-uuid", worker.uuid).Error("worker heartbeat expired")
worker.quit <- ErrHeartBeatExpired
return
......@@ -457,212 +446,48 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
worker.online = true
switch msg := wmsg.Message.(type) {
case *omanager.WorkerMessage_GoodbyeMessage:
worker.online = false
worker.quit <- msg.GoodbyeMessage.Reason
close(worker.taskCh)
worker.doGoodBye(msg)
return
case *omanager.WorkerMessage_SubmitTaskAck:
if v, ok := worker.recentTask.Get(msg.SubmitTaskAck.TaskId); ok {
dtask := v.(*dispatchTask)
dtask.setAck(msg.SubmitTaskAck)
}
worker.doSubmitAck(msg)
case *omanager.WorkerMessage_SubmitTaskResult:
worker.resultCh <- msg.SubmitTaskResult
worker.doSubmitResult(msg)
case *omanager.WorkerMessage_HeartbeatResponse:
worker.online = true
wm.UpdateHeartBeat(worker.uuid)
l.WithFields(log.Fields{
"worker-addr": worker.workerAddr,
"heartBeat": time.Now().Unix() - int64(msg.HeartbeatResponse.Timestamp),
}).Debug("receive worker heartbeat")
worker.doHeartBeat(msg)
case *omanager.WorkerMessage_BenefitAddrUpdate:
worker.doUpdateBenefit(msg)
case *omanager.WorkerMessage_NodeInfo:
// todo: remove this message.
nodeinfo := msg.NodeInfo
log.WithField("worker-addr", worker.workerAddr).Debugf("receive worker node info:%v", nodeinfo)
if nodeinfo.Hardware != nil && nodeinfo.Hardware.NET != nil {
matched, err := regexp.MatchString("((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})(\\.((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})){3}",
nodeinfo.Hardware.NET.Ip)
if err != nil {
l.WithField("nodeinfo.ip", nodeinfo.Hardware.NET.Ip).Error("ip匹配出现错误")
//return
}
if !matched {
// clean ip content if not match.
nodeinfo.Hardware.NET.Ip = ""
}
}
worker.info.nodeInfo = msg.NodeInfo
var addr = ""
if pubkey, err := utils.HexToPubkey(nodeinfo.Info.MinerPubkey); err != nil {
l.WithFields(log.Fields{
"worker-addr": worker.workerAddr,
"error": err,
}).Error("parse pubkey failed")
} else {
addr = utils.PubkeyToAddress(pubkey)
}
if addr == worker.workerAddr || addr == "" {
// addr is not change.
continue
}
// checkout addr exist.
if worker.workerAddr == "" {
if w := wm.GetWorkerByAddr(addr); w != nil {
log.WithField("worker-addr", addr).Error("worker with the address is existed")
return
}
}
if worker.workerAddr != "" {
// todo: worker change pubkey.
wm.InActiveWorker(worker)
}
// update new worker.
wm.SetWorkerAddr(worker, addr)
worker.doGetNodeInfo(msg)
case *omanager.WorkerMessage_FetchStandardTask:
if worker.info.nodeInfo == nil {
continue
}
l.WithFields(log.Fields{
"worker-addr": worker.workerAddr,
}).Debugf("receive worker fetch std task request:%v", msg.FetchStandardTask.TaskType)
pushTask := standardlib.StdTask{}
task, exist := wm.std.GetTask(msg.FetchStandardTask.TaskType)
if exist {
stdlib := wm.std.GetStdLib(task.TaskType)
if stdlib == nil {
l.WithField("task-type", task.TaskType).Warn("not found std lib")
continue
}
pushTask = task
pushTask.TaskId = uuid.NewString()
param, err := stdlib.GenerateParam(0)
if err != nil {
l.WithError(err).WithField("task-type", task.TaskType).Error("generate param failed")
continue
}
pushTask.TaskParam = []byte(param)
pushTask.TaskInLen = int32(len(param))
pushTask.TaskUid = "0"
pushTask.TaskTimestamp = uint64(time.Now().UnixNano())
pushTask.TaskKind = odysseus.TaskKind_StandardTask
pushTask.TaskFee = "0"
dtask := newDispatchTask(worker, &pushTask.TaskContent)
worker.taskCh <- dtask
break
} else {
l.WithField("task-type", msg.FetchStandardTask.TaskType).Warn("not found std task")
}
worker.doFetchStdTask(msg)
case *omanager.WorkerMessage_DeviceInfo:
l.WithFields(log.Fields{
"worker-addr": worker.workerAddr,
}).Debugf("receive worker device info:%v", msg.DeviceInfo)
if !worker.registed {
// ignore the info.
continue
}
// todo: verify signature
{
var infoHash [32]byte
infoHash = sha3.Sum256([]byte(msg.DeviceInfo.String()))
// update local cache.
worker.info.nodeInfo.Hardware = msg.DeviceInfo.Hardware
// check device info changed, and update to cache.
if bytes.Compare(infoHash[:], worker.deviceInfoHash) != 0 {
wm.UpdateWorkerDeviceInfo(worker, msg.DeviceInfo)
}
worker.deviceInfoHash = infoHash[:]
}
worker.doGetDeviceInfo(msg)
case *omanager.WorkerMessage_DeviceUsage:
if !worker.registed {
continue
}
wm.UpdateWorkerUsageInfo(worker, msg.DeviceUsage)
worker.usage.hwUsage = msg.DeviceUsage.Usage
l.WithFields(log.Fields{
"worker-addr": worker.workerAddr,
}).Debugf("receive worker device usage:%v", msg.DeviceUsage.Usage)
worker.doDeviceUsage(msg)
case *omanager.WorkerMessage_AddModelRunning:
if !worker.registed {
continue
}
l.WithFields(log.Fields{
"worker-addr": worker.workerAddr,
"model count": len(msg.AddModelRunning.Models),
}).Debugf("receive worker add model running:%v", msg.AddModelRunning.Models)
// todo: add worker running model.
worker.doAddRunningModel(msg)
case *omanager.WorkerMessage_DelModeRunning:
if !worker.registed {
continue
}
l.WithFields(log.Fields{
"worker-addr": worker.workerAddr,
"model count": len(msg.DelModeRunning.ModelIds),
}).Debugf("receive worker del model running:%v", msg.DelModeRunning.ModelIds)
// todo: del worker running model with model_id.
worker.doRemoveRunningModel(msg)
case *omanager.WorkerMessage_AddModelInstalled:
if !worker.registed {
continue
}
l.WithFields(log.Fields{
"worker-addr": worker.workerAddr,
"model count": len(msg.AddModelInstalled.Models),
}).Debugf("receive worker add model installed:%v", msg.AddModelInstalled.Models)
// todo: add worker installed model with model_id.
worker.doAddInstalledModel(msg)
case *omanager.WorkerMessage_DelModelInstalled:
if !worker.registed {
continue
}
l.WithFields(log.Fields{
"worker-addr": worker.workerAddr,
"model count": len(msg.DelModelInstalled.ModelIds),
}).Debugf("receive worker del model installed:%v", msg.DelModelInstalled.ModelIds)
// todo: del worker installed model with model_id.
worker.doRemoveInstalledModel(msg)
case *omanager.WorkerMessage_InstalledModelStatus:
if !worker.registed {
continue
}
l.WithFields(log.Fields{
"worker-addr": worker.workerAddr,
"model": len(msg.InstalledModelStatus.ModelId),
"type": "status",
}).Debugf("receive worker installed model status:%v", msg.InstalledModelStatus)
// todo: update worker installed model status.
worker.doInstalledModelStatus(msg)
case *omanager.WorkerMessage_RunningModelStatus:
if !worker.registed {
continue
}
l.WithFields(log.Fields{
"worker-addr": worker.workerAddr,
"model": len(msg.RunningModelStatus.ModelId),
"type": "status",
}).Debugf("receive worker running model status:%v", msg.RunningModelStatus)
// todo: update worker running model status.
worker.doRunningModelStatus(msg)
case *omanager.WorkerMessage_GpuUsage:
if !worker.registed {
continue
}
l.WithFields(log.Fields{
"worker-addr": worker.workerAddr,
"usage count": len(msg.GpuUsage.Usages),
}).Debugf("receive worker gpu usage:%v", msg.GpuUsage.Usages)
// todo: update worker gpu usage info.
worker.doGPUUsage(msg)
case *omanager.WorkerMessage_RegisteMessage:
// 1. do some verify.
......@@ -675,7 +500,6 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
// 2. check signature.
info := msg.RegisteMessage.Info
{
hardware := msg.RegisteMessage.Hardware
sig := msg.RegisteMessage.DeviceSignature
data := utils.CombineBytes([]byte(info.String()),
......@@ -732,7 +556,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
if !matched {
msg.RegisteMessage.Hardware.NET.Ip = ""
}
worker.info.nodeInfo = &omanager.NodeInfoResponse{
worker.info = &omanager.NodeInfoResponse{
Info: msg.RegisteMessage.Info,
Hardware: msg.RegisteMessage.Hardware,
Models: msg.RegisteMessage.Models,
......@@ -837,7 +661,7 @@ func (wm *WorkerManager) makeTaskProof(worker *Worker, task *odysseus.TaskConten
TaskContainerSignature: hex.EncodeToString(t.containerSignature),
TaskMinerSignature: hex.EncodeToString(t.minerSignature),
TaskWorkerAccount: worker.workerAddr,
TaskProfitAccount: worker.info.nodeInfo.Info.BenefitAddress,
TaskProfitAccount: worker.info.Info.BenefitAddress,
}
return proof
}
......@@ -30,26 +30,6 @@ func (w workerRegistry) Instance() string {
func (w workerRegistry) Status() string {
return fmt.Sprintf("%s", w.worker.status)
}
func generateAGpuRam() int {
return 1024 * 1024 * 1024 * (rand.Intn(3)*8 + 8) // 8, 16, 24
}
func generateARam() (int64, int64) {
return 32 * 1024 * 1024 * 1024, int64(rand.Intn(10)) * 1024 * 1024 * 1024
}
func generateAGpuModel() string {
m := rand.Intn(4)*10 + 3060 // 3060 ~ 3090
return fmt.Sprintf("Nvidia GTX %d", m)
}
func generateMac() string {
return fmt.Sprintf("%02x:%02x:%02x:%02x:%02x:%02x",
rand.Intn(256), rand.Intn(256), rand.Intn(256),
rand.Intn(256), rand.Intn(256), rand.Intn(256))
}
func (w workerRegistry) DetailInfo() (json.RawMessage, error) {
if w.worker == nil {
return nil, fmt.Errorf("worker is nil")
......@@ -73,9 +53,9 @@ func (w workerRegistry) DetailInfo() (json.RawMessage, error) {
ActiveNM: make([]string, 0),
TaskList: taskList,
}
if w.worker.info.nodeInfo != nil {
info.BenefitAddress = w.worker.info.nodeInfo.Info.BenefitAddress
info.IP = w.worker.info.nodeInfo.Hardware.NET.Ip
if w.worker.info != nil {
info.BenefitAddress = w.worker.info.Info.BenefitAddress
info.IP = w.worker.info.Hardware.NET.Ip
}
nmList, _ := w.wm.WorkerNmList(w.worker)
......@@ -85,27 +65,24 @@ func (w workerRegistry) DetailInfo() (json.RawMessage, error) {
info.ActiveNM = append(info.ActiveNM, endpoint)
}
}
info.HearBeat = w.wm.GetHeartBeat(w.worker.uuid) * 1000 // to ms
info.HearBeat = w.worker.heartBeat * 1000 // to ms
info.MinerAddress = w.worker.workerAddr
info.Nonce = 0
if w.worker.info.nodeInfo != nil {
info.CpuModel = w.worker.info.nodeInfo.Hardware.CPU.Model
info.CpuCore = int(w.worker.info.nodeInfo.Hardware.CPU.Cores)
if info.CpuCore == 0 {
info.CpuModel = "Intel(R) Xeon(R) CPU E5-2699 v4 @ 2.20GHz"
info.CpuCore = 8
if w.worker.info != nil {
info.CpuModel = w.worker.info.Hardware.CPU.Model
info.CpuCore = int(w.worker.info.Hardware.CPU.Cores)
info.GPUModel = w.worker.info.Hardware.GPU[0].Model
info.GPURam = int(w.worker.info.Hardware.GPU[0].MemTotal)
info.RamTotal = w.worker.info.Hardware.RAM.Total
info.RamUsed = info.RamTotal - w.worker.info.Hardware.RAM.Free
}
info.GPUModel = generateAGpuModel()
info.GPURam = generateAGpuRam()
ramTotal, ramUsed := generateARam()
info.MacAddress = w.worker.info.Hardware.NET.Mac
info.RamTotal = ramTotal
info.RamUsed = ramUsed
}
// todo: record the create time and bootup time
info.CreateTime = time.Now().Add(time.Hour * 24 * 4).Unix()
info.BootupTime = time.Now().Add(time.Hour * 12).Unix()
info.Workload = rand.Intn(500) + 1000
info.MacAddress = generateMac()
return json.Marshal(info)
}
......@@ -9,7 +9,6 @@ import (
"github.com/odysseus/mogo/operator"
"github.com/odysseus/mogo/types"
"github.com/odysseus/nodemanager/config"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
log "github.com/sirupsen/logrus"
"go.mongodb.org/mongo-driver/mongo"
"strconv"
......@@ -17,24 +16,10 @@ import (
"time"
)
func (wm *WorkerManager) UpdateWorkerUsageInfo(worker *Worker, usageInfo *omanager.DeviceUsageResponse) {
// todo: update usage info to mogo.
wm.rdb.Set(context.Background(), workerUsageInfoKey(worker), usageInfo.String(), 0)
}
func (wm *WorkerManager) UpdateWorkerDeviceInfo(worker *Worker, deviceInfos *omanager.DeviceInfoMessage) {
// todo: update device info to mogo.
wm.rdb.Set(context.Background(), workerDeviceInfoKey(worker), deviceInfos.String(), 0)
}
func (wm *WorkerManager) GetWorkerNonce(worker *Worker) (int, error) {
return 0, nil
}
func (wm *WorkerManager) updateWorkerInfo(worker *Worker, winfo *operator.WorkerInfo) error {
// 2. update worker running info.
wm.workerRunningOperator.DeleteByWorkerId(context.Background(), worker.WorkerAccount().String())
for _, running := range worker.info.nodeInfo.Models.RunningModels {
for _, running := range worker.info.Models.RunningModels {
id, _ := strconv.Atoi(running.ModelId)
iInfo := &operator.WorkerRunningInfo{
WorkerId: worker.WorkerAccount().String(),
......@@ -52,14 +37,14 @@ func (wm *WorkerManager) updateWorkerInfo(worker *Worker, winfo *operator.Worker
}
// 3. update worker installed info.
wm.workerInstalledOperator.DeleteByWorkerId(context.Background(), worker.WorkerAccount().String())
for _, installed := range worker.info.nodeInfo.Models.InstalledModels {
for _, installed := range worker.info.Models.InstalledModels {
id, _ := strconv.Atoi(installed.ModelId)
iInfo := &operator.WorkerInstalledInfo{
WorkerId: worker.WorkerAccount().String(),
ModelId: id,
}
if len(worker.info.nodeInfo.Hardware.GPU) > 0 {
iInfo.GpuFree = worker.info.nodeInfo.Hardware.GPU[0].MemFree
if len(worker.info.Hardware.GPU) > 0 {
iInfo.GpuFree = worker.info.Hardware.GPU[0].MemFree
}
_, err := wm.workerInstalledOperator.Insert(context.Background(), iInfo)
if err != nil {
......@@ -72,9 +57,9 @@ func (wm *WorkerManager) updateWorkerInfo(worker *Worker, winfo *operator.Worker
}
// 1. update worker info.
winfo.Hardware = types.PbToHardwareInfo(worker.info.nodeInfo.Hardware)
winfo.Models = types.PbToModelInfo(worker.info.nodeInfo.Models)
winfo.NodeInfo = types.PbToNodeInfo(worker.info.nodeInfo.Info)
winfo.Hardware = types.PbToHardwareInfo(worker.info.Hardware)
winfo.Models = types.PbToModelInfo(worker.info.Models)
winfo.NodeInfo = types.PbToNodeInfo(worker.info.Info)
err := wm.workerInfoOperator.UpdateWorker(context.Background(), winfo)
if err != nil {
return err
......@@ -87,9 +72,9 @@ func (wm *WorkerManager) addWorkerInfo(worker *Worker) error {
// 1. add worker info.
_, err := wm.workerInfoOperator.InsertWorker(context.Background(), &operator.WorkerInfo{
WorkerId: worker.WorkerAccount().String(),
NodeInfo: types.PbToNodeInfo(worker.info.nodeInfo.Info),
Models: types.PbToModelInfo(worker.info.nodeInfo.Models),
Hardware: types.PbToHardwareInfo(worker.info.nodeInfo.Hardware),
NodeInfo: types.PbToNodeInfo(worker.info.Info),
Models: types.PbToModelInfo(worker.info.Models),
Hardware: types.PbToHardwareInfo(worker.info.Hardware),
})
if err != nil {
return err
......@@ -97,7 +82,7 @@ func (wm *WorkerManager) addWorkerInfo(worker *Worker) error {
// 2. add worker running info.
for _, running := range worker.info.nodeInfo.Models.RunningModels {
for _, running := range worker.info.Models.RunningModels {
id, _ := strconv.Atoi(running.ModelId)
iInfo := &operator.WorkerRunningInfo{
WorkerId: worker.WorkerAccount().String(),
......@@ -115,14 +100,14 @@ func (wm *WorkerManager) addWorkerInfo(worker *Worker) error {
}
// 3. add worker installed info.
for _, installed := range worker.info.nodeInfo.Models.InstalledModels {
for _, installed := range worker.info.Models.InstalledModels {
id, _ := strconv.Atoi(installed.ModelId)
iInfo := &operator.WorkerInstalledInfo{
WorkerId: worker.WorkerAccount().String(),
ModelId: id,
}
if len(worker.info.nodeInfo.Hardware.GPU) > 0 {
iInfo.GpuFree = worker.info.nodeInfo.Hardware.GPU[0].MemFree
if len(worker.info.Hardware.GPU) > 0 {
iInfo.GpuFree = worker.info.Hardware.GPU[0].MemFree
}
_, err := wm.workerInstalledOperator.Insert(context.Background(), iInfo)
if err != nil {
......@@ -196,8 +181,8 @@ func (wm *WorkerManager) InActiveWorker(worker *Worker) {
if list, err := wm.rdb.SMembers(context.Background(), workerStatusKey(worker)).Result(); err == nil && len(list) == 0 {
wm.rdb.Del(context.Background(), workerStatusKey(worker))
if worker.info.nodeInfo != nil {
wm.delWorkerFromWhiteListSet(worker, worker.info.nodeInfo.Info.BenefitAddress)
if worker.info != nil {
wm.delWorkerFromWhiteListSet(worker, worker.info.Info.BenefitAddress)
// delete worker info from mogo.
n, err := wm.workerInfoOperator.DeleteByWorkerId(context.Background(), worker.WorkerAccount().String())
if err != nil {
......@@ -260,26 +245,6 @@ func (wm *WorkerManager) checkWhiteList(worker *Worker, benefit string) error {
}
}
func workerResourceInfoKey(w *Worker) string {
return config.WORKER_RESOURCE_INFO_PREFIX + w.workerAddr
}
func workerBootedResourceInfoKey(w *Worker) string {
return config.WORKER_BOOTED_RESOURCE_INFO_PREFIX + w.workerAddr
}
func workerDeviceInfoKey(w *Worker) string {
return config.WORKER_DEVICE_INFO_PREFIX + w.workerAddr
}
func workerUsageInfoKey(w *Worker) string {
return config.WORKER_USAGE_INFO_PREFIX + w.workerAddr
}
func workerDeviceStatusInfoKey(w *Worker) string {
return config.WORKER_DEVICE_STATUS_PREFIX + w.workerAddr
}
func workerSetsKey(benefit string) string {
return config.WORKER_SETS_PREFIX + benefit
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment