Commit 5083c77d authored by vicotor's avatar vicotor

update nm

parent 9b00464f
......@@ -3,7 +3,7 @@ package server
import (
"errors"
odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
log "github.com/sirupsen/logrus"
"sync"
"time"
......
......@@ -11,7 +11,7 @@ import (
"github.com/odysseus/nodemanager/nmregister"
"github.com/odysseus/nodemanager/utils"
basev1 "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
"github.com/odysseus/service-registry/query"
"github.com/odysseus/service-registry/registry"
"github.com/redis/go-redis/v9"
......@@ -139,18 +139,6 @@ func (n *Node) PostProof(proof *basev1.TaskProof) {
n.taskProofCh <- proof
}
func (n *Node) Start() error {
go n.registry.Start()
go n.register.Start()
go n.postLoop()
n.SetStatus("running")
if err := n.apiStart(); err != nil {
return err
}
return nil
}
func (n *Node) postLoop() {
for {
select {
......@@ -172,6 +160,22 @@ func (n *Node) postLoop() {
}
}
func (n *Node) SetStatus(status string) {
n.register.SetStatus(status)
}
func (n *Node) Start() error {
go n.registry.Start()
go n.register.Start()
go n.postLoop()
n.SetStatus("running")
if err := n.apiStart(); err != nil {
return err
}
return nil
}
func (n *Node) Stop() {
n.registry.Clear()
n.registry.Stop()
......@@ -181,6 +185,10 @@ func (n *Node) Stop() {
close(n.taskProofCh)
}
func (n *Node) SetStatus(status string) {
n.register.SetStatus(status)
func (n *Node) PayForFee(uid int64, fee int64) error {
return n.cache.PayforFee(uid, fee)
}
func (n *Node) Cache() *cachedata.CacheData {
return n.cache
}
......@@ -4,7 +4,7 @@ import (
"context"
"errors"
"github.com/odysseus/nodemanager/utils"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
log "github.com/sirupsen/logrus"
"strconv"
"strings"
......
......@@ -5,7 +5,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/odysseus/nodemanager/utils"
odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
log "github.com/sirupsen/logrus"
"math/big"
"time"
......@@ -89,7 +89,7 @@ func (wm *WorkerManager) computeTaskResult(worker *Worker, task *odysseus.TaskCo
// verify miner_signature
// miner_signature = sign(hash((task_id+hash(task_param)+hash(task_result)))
verified := utils.VerifySignature(utils.CombineBytes([]byte(result.TaskId), paramHash[:], resultHash[:]),
result.MinerSignature, utils.FromHex(worker.info.nodeInfo.MinerPubkey))
result.MinerSignature, utils.FromHex(worker.info.nodeInfo.Info.MinerPubkey))
log.WithFields(log.Fields{
"task-id": task.TaskId,
"minerSignatureVerify": verified,
......@@ -181,7 +181,7 @@ func (wm *WorkerManager) standardTaskResult(worker *Worker, task *odysseus.TaskC
// verify miner_signature
// miner_signature = sign(hash((task_id+hash(task_param)+hash(task_result)))
verified := utils.VerifySignature(utils.CombineBytes([]byte(result.TaskId), paramHash[:], resultHash[:]),
result.MinerSignature, utils.FromHex(worker.info.nodeInfo.MinerPubkey))
result.MinerSignature, utils.FromHex(worker.info.nodeInfo.Info.MinerPubkey))
log.WithFields(log.Fields{
"task-id": task.TaskId,
"minerSignatureVerify": verified,
......
......@@ -4,17 +4,17 @@ import (
"errors"
"github.com/ethereum/go-ethereum/common"
lru "github.com/hashicorp/golang-lru"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
log "github.com/sirupsen/logrus"
"time"
)
type workerInfo struct {
nodeInfo *omanager.NodeInfoResponse
deviceUsageInfo []*omanager.DeviceUsage
deviceInfo *omanager.DeviceInfoMessage
deviceStatusInfo *omanager.StatusResponse
resourceInfo *omanager.SubmitResourceMap
}
type workerUsageInfo struct {
hwUsage *omanager.HardwareUsage
}
type sendMsgCallback struct {
......@@ -37,6 +37,7 @@ type Worker struct {
addFirstSucceed bool
info workerInfo
usage workerUsageInfo
workerAddr string // worker address from public-key
deviceInfoHash []byte
recentTask *lru.ARCCache
......@@ -48,7 +49,7 @@ type Worker struct {
func (w *Worker) ProfitAccount() common.Address {
if w.info.nodeInfo != nil {
return common.HexToAddress(w.info.nodeInfo.BenefitAddress)
return common.HexToAddress(w.info.nodeInfo.Info.BenefitAddress)
}
return common.Address{}
}
......@@ -121,3 +122,22 @@ func (w *Worker) RecvMessage() {
w.msgCh <- wmsg
}
}
func (w *Worker) ModelOperate(info interface{}, operate omanager.ModelOperateType) *omanager.ManagerMessage_ModelOperateRequest {
request := &omanager.ManagerMessage_ModelOperateRequest{
ModelOperateRequest: &omanager.ModelOperateRequest{
ModelOperates: []*omanager.ModelOperate{
{
ModelId: "",
ImageName: "",
Username: "",
Password: "",
Cmd: "",
Operate: operate,
},
},
},
}
return request
}
......@@ -9,11 +9,12 @@ import (
"github.com/golang/protobuf/proto"
"github.com/google/uuid"
lru "github.com/hashicorp/golang-lru"
"github.com/odysseus/cache/cachedata"
"github.com/odysseus/nodemanager/config"
"github.com/odysseus/nodemanager/standardlib"
"github.com/odysseus/nodemanager/utils"
odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
"github.com/odysseus/service-registry/registry"
"github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"
......@@ -46,6 +47,14 @@ const (
TASK_TIMEOUT
)
type NodeInterface interface {
PostResult(*odysseus.TaskReceipt)
PostProof(*odysseus.TaskProof)
Sign(hash []byte) ([]byte, error)
Cache() *cachedata.CacheData
PayForFee(uid int64, fee int64) error
}
type WorkerManager struct {
rdb *redis.Client
heartBeat map[int64]int64
......@@ -58,11 +67,11 @@ type WorkerManager struct {
wkRwLock sync.RWMutex
quit chan struct{}
node *Node
node NodeInterface
std *standardlib.StandardTasks
}
func NewWorkerManager(rdb *redis.Client, node *Node) *WorkerManager {
func NewWorkerManager(rdb *redis.Client, node NodeInterface) *WorkerManager {
return &WorkerManager{
heartBeat: make(map[int64]int64),
workerReg: make(map[int64]*registry.Registry),
......@@ -228,12 +237,12 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
workerCheckTicker := time.NewTicker(workerCheckDuration)
defer workerCheckTicker.Stop()
statusTicker := time.NewTicker(initialInterval)
defer statusTicker.Stop()
deviceUsageTicker := time.NewTicker(initialInterval)
defer deviceUsageTicker.Stop()
gpuUsageTicker := time.NewTicker(initialInterval)
defer gpuUsageTicker.Stop()
worker.status = "connected"
defer func() {
......@@ -270,11 +279,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
nodeinfoTicker.Reset(time.Hour * 24)
}
if worker.info.deviceStatusInfo != nil {
statusTicker.Reset(time.Second * time.Duration(tickerConf.StatusTicker))
}
if worker.info.deviceUsageInfo != nil {
if worker.usage.hwUsage != nil {
deviceUsageTicker.Reset(time.Second * time.Duration(tickerConf.DeviceUsageTicker))
}
......@@ -309,22 +314,20 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
if !worker.registed {
continue
}
deviceUsage := new(omanager.ManagerMessage_DeviceUsage)
deviceUsage.DeviceUsage = &omanager.DeviceUsageRequest{}
deviceUsage := new(omanager.ManagerMessage_DeviceUsageRequest)
deviceUsage.DeviceUsageRequest = &omanager.DeviceUsageRequest{}
msg.Message = deviceUsage
callback = func(err error) bool {
return true
}
case <-statusTicker.C:
// if worker is not registed to me, ignore device status info.
if !worker.registed {
continue
}
status := new(omanager.ManagerMessage_StatusRequest)
status.StatusRequest = &omanager.StatusRequest{}
msg.Message = status
case <-gpuUsageTicker.C:
gpu := new(omanager.ManagerMessage_GpuUsageRequest)
gpu.GpuUsageRequest = &omanager.GPUUsageRequest{}
msg.Message = gpu
callback = func(err error) bool {
log.WithField("worker", worker.uuid).Debug("send heart beat to worker")
return true
}
......@@ -333,8 +336,8 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
return nil
}
task := dtask.task
taskMsg := new(omanager.ManagerMessage_PushTaskMessage)
taskMsg.PushTaskMessage = &omanager.PushTaskMessage{
taskMsg := new(omanager.ManagerMessage_PushTask)
taskMsg.PushTask = &omanager.PushTaskMessage{
TaskId: task.TaskId,
TaskType: task.TaskType,
TaskKind: task.TaskKind,
......@@ -465,18 +468,23 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
"heartBeat": time.Now().Unix() - int64(msg.HeartbeatResponse.Timestamp),
}).Debug("receive worker heartbeat")
case *omanager.WorkerMessage_NodeInfo:
matched, err := regexp.MatchString("((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})(\\.((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})){3}", msg.NodeInfo.DeviceIp)
nodeinfo := msg.NodeInfo
if nodeinfo.Hardware != nil && nodeinfo.Hardware.NET != nil {
matched, err := regexp.MatchString("((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})(\\.((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})){3}",
nodeinfo.Hardware.NET.Ip)
if err != nil {
l.WithField("nodeinfo.ip", msg.NodeInfo.DeviceIp).Error("ip匹配出现错误")
l.WithField("nodeinfo.ip", nodeinfo.Hardware.NET.Ip).Error("ip匹配出现错误")
//return
}
if !matched {
msg.NodeInfo.DeviceIp = ""
// clean ip content if not match.
nodeinfo.Hardware.NET.Ip = ""
}
}
worker.info.nodeInfo = msg.NodeInfo
var addr = ""
if pubkey, err := utils.HexToPubkey(msg.NodeInfo.MinerPubkey); err != nil {
if pubkey, err := utils.HexToPubkey(nodeinfo.Info.MinerPubkey); err != nil {
l.WithFields(log.Fields{
"worker-addr": worker.workerAddr,
"error": err,
......@@ -505,29 +513,6 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
// update new worker.
wm.SetWorkerAddr(worker, addr)
case *omanager.WorkerMessage_Status:
if !worker.registed {
continue
}
worker.info.deviceStatusInfo = msg.Status
l.WithFields(log.Fields{
"worker-addr": worker.workerAddr,
}).Debugf("receive worker status:0x%x", msg.Status.DeviceStatus)
wm.UpdateWorkerDeviceStatusInfo(worker, msg.Status.DeviceStatus)
case *omanager.WorkerMessage_ResourceMap:
l.WithFields(log.Fields{
"registed": worker.registed,
"worker-addr": worker.workerAddr,
}).Infof("receive worker resource map:%v", msg.ResourceMap)
if !worker.registed {
continue
}
worker.info.resourceInfo = msg.ResourceMap
wm.UpdateWorkerResourceInfo(worker, msg.ResourceMap.ResourceMap)
wm.UpdateWorkerBootedResourceInfo(worker, msg.ResourceMap.BootupMap)
case *omanager.WorkerMessage_FetchStandardTask:
if worker.info.nodeInfo == nil {
continue
......@@ -586,7 +571,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
}
infoHash = sha3.Sum256(infoData)
worker.info.deviceInfo = msg.DeviceInfo
worker.info.nodeInfo.Hardware = msg.DeviceInfo.Hardware
if worker.registed && worker.addFirstSucceed == false {
wm.AddWorkerToQueue(worker)
......@@ -605,10 +590,11 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
usageData, _ := json.Marshal(msg.DeviceUsage)
wm.UpdateWorkerUsageInfo(worker, string(usageData))
worker.info.deviceUsageInfo = msg.DeviceUsage.Usage
worker.usage.hwUsage = msg.DeviceUsage.Usage
l.WithFields(log.Fields{
"worker-addr": worker.workerAddr,
}).Debugf("receive worker device usage:%v", msg.DeviceUsage.Usage)
case *omanager.WorkerMessage_RegisteMessage:
if worker.registed {
continue
......@@ -617,14 +603,17 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
"worker-addr": worker.workerAddr,
}).Debug("receive registed message")
// todo: verify signature
info := msg.RegisteMessage.Info
{
hardware := msg.RegisteMessage.Hardware
sig := msg.RegisteMessage.DeviceSignature
data := utils.CombineBytes(bytes.NewBufferString(msg.RegisteMessage.DeviceIp).Bytes(),
bytes.NewBufferString(msg.RegisteMessage.MinerPubkey).Bytes(),
bytes.NewBufferString(msg.RegisteMessage.BenefitAddress).Bytes(),
data := utils.CombineBytes([]byte(info.String()),
[]byte(hardware.String()),
[]byte(msg.RegisteMessage.Models.String()),
big.NewInt(int64(msg.RegisteMessage.Timestamp)).Bytes())
if !utils.VerifySignature(data, sig, utils.FromHex(msg.RegisteMessage.MinerPubkey)) {
if !utils.VerifySignature(data, sig, utils.FromHex(info.MinerPubkey)) {
l.WithFields(log.Fields{
"worker-addr": worker.workerAddr,
}).Error("verify device signature failed")
......@@ -640,7 +629,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
}
}
if pubkey, err := utils.HexToPubkey(msg.RegisteMessage.MinerPubkey); err != nil {
if pubkey, err := utils.HexToPubkey(info.MinerPubkey); err != nil {
l.WithFields(log.Fields{
"worker-addr": worker.workerAddr,
"error": err,
......@@ -657,25 +646,26 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
worker.workerAddr = addr
}
worker.registed = true
matched, err := regexp.MatchString("((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})(\\.((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})){3}", msg.RegisteMessage.DeviceIp)
matched, err := regexp.MatchString("((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})(\\.((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})){3}",
msg.RegisteMessage.Hardware.NET.Ip)
if err != nil {
log.WithField("registed.ip", msg.RegisteMessage.DeviceIp).Error("ip匹配出现错误")
log.WithField("registed.ip", msg.RegisteMessage.Hardware.NET.Ip).Error("ip匹配出现错误")
}
if !matched {
msg.RegisteMessage.DeviceIp = ""
msg.RegisteMessage.Hardware.NET.Ip = ""
}
worker.info.nodeInfo = &omanager.NodeInfoResponse{
MinerPubkey: msg.RegisteMessage.MinerPubkey,
BenefitAddress: msg.RegisteMessage.BenefitAddress,
DeviceIp: msg.RegisteMessage.DeviceIp,
Info: msg.RegisteMessage.Info,
Hardware: msg.RegisteMessage.Hardware,
Models: msg.RegisteMessage.Models,
}
wm.SetWorkerAddr(worker, worker.workerAddr)
if err := wm.checkWhiteList(worker, worker.info.nodeInfo.BenefitAddress); err != nil {
if err := wm.checkWhiteList(worker, info.BenefitAddress); err != nil {
worker.quit <- err
return
} else {
wm.addWorkerToSets(worker, worker.info.nodeInfo.BenefitAddress)
wm.addWorkerToSets(worker, info.BenefitAddress)
}
wreg := workerRegistry{
......@@ -708,7 +698,7 @@ func (wm *WorkerManager) Payment(task *odysseus.TaskContent) error {
fee, _ := strconv.ParseInt(task.TaskFee, 10, 64)
uid, _ := strconv.ParseInt(task.TaskUid, 10, 64)
err := wm.node.cache.PayforFee(uid, fee)
err := wm.node.PayForFee(uid, fee)
if err != nil {
return err
}
......@@ -762,7 +752,7 @@ func (wm *WorkerManager) makeTaskProof(worker *Worker, task *odysseus.TaskConten
TaskContainerSignature: hex.EncodeToString(t.containerSignature),
TaskMinerSignature: hex.EncodeToString(t.minerSignature),
TaskWorkerAccount: worker.workerAddr,
TaskProfitAccount: worker.info.nodeInfo.BenefitAddress,
TaskProfitAccount: worker.info.nodeInfo.Info.BenefitAddress,
}
return proof
}
......@@ -7,7 +7,6 @@ import (
"github.com/odysseus/service-registry/query"
"github.com/odysseus/service-registry/registry"
"math/rand"
"strings"
"time"
)
......@@ -75,8 +74,8 @@ func (w workerRegistry) DetailInfo() (json.RawMessage, error) {
TaskList: taskList,
}
if w.worker.info.nodeInfo != nil {
info.BenefitAddress = w.worker.info.nodeInfo.BenefitAddress
info.IP = w.worker.info.nodeInfo.DeviceIp
info.BenefitAddress = w.worker.info.nodeInfo.Info.BenefitAddress
info.IP = w.worker.info.nodeInfo.Hardware.NET.Ip
}
nmList, _ := w.wm.WorkerNmList(w.worker)
......@@ -89,15 +88,9 @@ func (w workerRegistry) DetailInfo() (json.RawMessage, error) {
info.HearBeat = w.wm.GetHeartBeat(w.worker.uuid) * 1000 // to ms
info.MinerAddress = w.worker.workerAddr
info.Nonce = int64(w.worker.nonce)
if w.worker.info.deviceInfo != nil {
for i := 0; i < len(w.worker.info.deviceInfo.Devices); i++ {
if strings.HasPrefix(w.worker.info.deviceInfo.Devices[i].DeviceType, "cpu") {
if info.CpuModel == "" {
info.CpuModel = w.worker.info.deviceInfo.Devices[i].DeviceModel
}
info.CpuCore++
}
}
if w.worker.info.nodeInfo != nil {
info.CpuModel = w.worker.info.nodeInfo.Hardware.CPU.Model
info.CpuCore = int(w.worker.info.nodeInfo.Hardware.CPU.Cores)
if info.CpuCore == 0 {
info.CpuModel = "Intel(R) Xeon(R) CPU E5-2699 v4 @ 2.20GHz"
info.CpuCore = 8
......
......@@ -65,13 +65,10 @@ func (wm *WorkerManager) IncrWorkerNonce(worker *Worker) (int, error) {
func (wm *WorkerManager) AddWorkerFirst(worker *Worker) error {
log.WithField("worker", worker.workerAddr).Info("add worker first time.")
wm.UpdateWorkerActive(worker)
for _, device := range worker.info.deviceInfo.Devices {
if !strings.HasPrefix(device.DeviceType, "gpu") {
continue
}
for _, gpu := range worker.info.nodeInfo.Hardware.GPU {
// add device to redis
priority := 0
_ = device // todo: set priority with device info.
_ = gpu // todo: set priority with device info.
for m := 0; m < config.GetConfig().GetWorkerMultiple(); m++ {
// add worker to redis queue
if err := wm.rdb.RPush(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(priority), workerUid(worker)).Err(); err != nil {
......@@ -187,7 +184,7 @@ func (wm *WorkerManager) InActiveWorker(worker *Worker) {
wm.rdb.Del(context.Background(), workerResourceInfoKey(worker))
wm.rdb.Del(context.Background(), workerBootedResourceInfoKey(worker))
if worker.info.nodeInfo != nil {
wm.rmWorkerFromSets(worker, worker.info.nodeInfo.BenefitAddress)
wm.rmWorkerFromSets(worker, worker.info.nodeInfo.Info.BenefitAddress)
}
}
}
......@@ -217,7 +214,7 @@ func (wm *WorkerManager) setWorkerLastTaskTime(worker *Worker, tm int64) error {
}
func (wm *WorkerManager) checkWhiteList(worker *Worker, benefit string) error {
wh, err := wm.node.cache.GetWhWithAddr(benefit)
wh, err := wm.node.Cache().GetWhWithAddr(benefit)
if err != nil {
log.WithFields(log.Fields{
"worker": worker.uuid,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment