Commit f3f5c792 authored by vicotor's avatar vicotor

update protocol

parent 14bc5d34
...@@ -115,7 +115,7 @@ func (d *dispatchTask) finalize(wm *WorkerManager) { ...@@ -115,7 +115,7 @@ func (d *dispatchTask) finalize(wm *WorkerManager) {
task := d.task task := d.task
if task.TaskKind != odysseus.TaskKind_StandardTask && d.worker.online == true { if task.TaskKind != odysseus.TaskKind_StandardTask && d.worker.online == true {
_ = wm.AddWorkerSingle(d.worker)
} }
_, err := wm.taskResult(d.worker, task, result) _, err := wm.taskResult(d.worker, task, result)
......
...@@ -6,7 +6,6 @@ import ( ...@@ -6,7 +6,6 @@ import (
"github.com/odysseus/nodemanager/utils" "github.com/odysseus/nodemanager/utils"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2" omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"strconv"
"strings" "strings"
) )
...@@ -64,13 +63,6 @@ func (n *NodeManagerService) DispatchTask(ctx context.Context, request *omanager ...@@ -64,13 +63,6 @@ func (n *NodeManagerService) DispatchTask(ctx context.Context, request *omanager
if worker.online == false { if worker.online == false {
return nil, errors.New("worker offline") return nil, errors.New("worker offline")
} }
{
nonceds := strings.Split(mids[1], ":")
nonce, _ := strconv.ParseInt(nonceds[0], 10, 64)
if nonce < int64(worker.nonce) {
return nil, errors.New("expired worker nonce")
}
}
dtask := newDispatchTask(worker, request.TaskData) dtask := newDispatchTask(worker, request.TaskData)
......
...@@ -3,7 +3,6 @@ package server ...@@ -3,7 +3,6 @@ package server
import ( import (
"bytes" "bytes"
"encoding/hex" "encoding/hex"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
...@@ -285,15 +284,17 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error { ...@@ -285,15 +284,17 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
case <-workerCheckTicker.C: case <-workerCheckTicker.C:
if worker.info.nodeInfo != nil { if worker.info.nodeInfo != nil {
//nodeinfoTicker.Reset(time.Hour * 24) nodeinfoTicker.Reset(time.Minute * 30)
} }
if worker.usage.hwUsage != nil { if worker.usage.hwUsage != nil {
deviceUsageTicker.Reset(time.Second * time.Duration(tickerConf.DeviceUsageTicker)) deviceUsageTicker.Reset(time.Second * time.Duration(tickerConf.DeviceUsageTicker))
} }
if worker.registed && worker.addFirstSucceed == false && len(worker.deviceInfoHash) > 0 { if worker.registed && worker.addFirstSucceed == false {
wm.AddWorkerToQueue(worker) if err := wm.AddWorker(worker); err == nil {
worker.addFirstSucceed = true
}
} }
wm.UpdateWorkerActive(worker) wm.UpdateWorkerActive(worker)
...@@ -477,6 +478,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -477,6 +478,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
"heartBeat": time.Now().Unix() - int64(msg.HeartbeatResponse.Timestamp), "heartBeat": time.Now().Unix() - int64(msg.HeartbeatResponse.Timestamp),
}).Debug("receive worker heartbeat") }).Debug("receive worker heartbeat")
case *omanager.WorkerMessage_NodeInfo: case *omanager.WorkerMessage_NodeInfo:
// todo: remove this message.
nodeinfo := msg.NodeInfo nodeinfo := msg.NodeInfo
log.WithField("worker-addr", worker.workerAddr).Debugf("receive worker node info:%v", nodeinfo) log.WithField("worker-addr", worker.workerAddr).Debugf("receive worker node info:%v", nodeinfo)
if nodeinfo.Hardware != nil && nodeinfo.Hardware.NET != nil { if nodeinfo.Hardware != nil && nodeinfo.Hardware.NET != nil {
...@@ -566,29 +568,15 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -566,29 +568,15 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
// ignore the info. // ignore the info.
continue continue
} }
// todo: verify signature
{ {
var infoHash [32]byte var infoHash [32]byte
infoData, err := json.Marshal(msg.DeviceInfo) infoHash = sha3.Sum256([]byte(msg.DeviceInfo.String()))
if err != nil { // update local cache.
l.WithFields(log.Fields{
"worker-addr": worker.workerAddr,
"error": err,
}).Error("marshal device info failed")
}
if len(infoData) == 0 {
continue
}
infoHash = sha3.Sum256(infoData)
worker.info.nodeInfo.Hardware = msg.DeviceInfo.Hardware worker.info.nodeInfo.Hardware = msg.DeviceInfo.Hardware
if worker.registed && worker.addFirstSucceed == false {
wm.AddWorkerToQueue(worker)
}
// check device info changed, and update to cache. // check device info changed, and update to cache.
if bytes.Compare(infoHash[:], worker.deviceInfoHash) != 0 { if bytes.Compare(infoHash[:], worker.deviceInfoHash) != 0 {
wm.UpdateWorkerDeviceInfo(worker, string(infoData)) wm.UpdateWorkerDeviceInfo(worker, msg.DeviceInfo)
} }
worker.deviceInfoHash = infoHash[:] worker.deviceInfoHash = infoHash[:]
} }
...@@ -597,8 +585,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -597,8 +585,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
if !worker.registed { if !worker.registed {
continue continue
} }
usageData, _ := json.Marshal(msg.DeviceUsage) wm.UpdateWorkerUsageInfo(worker, msg.DeviceUsage)
wm.UpdateWorkerUsageInfo(worker, string(usageData))
worker.usage.hwUsage = msg.DeviceUsage.Usage worker.usage.hwUsage = msg.DeviceUsage.Usage
l.WithFields(log.Fields{ l.WithFields(log.Fields{
...@@ -613,6 +600,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -613,6 +600,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
"worker-addr": worker.workerAddr, "worker-addr": worker.workerAddr,
"model count": len(msg.AddModelRunning.Models), "model count": len(msg.AddModelRunning.Models),
}).Debugf("receive worker add model running:%v", msg.AddModelRunning.Models) }).Debugf("receive worker add model running:%v", msg.AddModelRunning.Models)
// todo: add worker running model.
case *omanager.WorkerMessage_DelModeRunning: case *omanager.WorkerMessage_DelModeRunning:
if !worker.registed { if !worker.registed {
...@@ -622,6 +610,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -622,6 +610,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
"worker-addr": worker.workerAddr, "worker-addr": worker.workerAddr,
"model count": len(msg.DelModeRunning.ModelIds), "model count": len(msg.DelModeRunning.ModelIds),
}).Debugf("receive worker del model running:%v", msg.DelModeRunning.ModelIds) }).Debugf("receive worker del model running:%v", msg.DelModeRunning.ModelIds)
// todo: del worker running model with model_id.
case *omanager.WorkerMessage_AddModelInstalled: case *omanager.WorkerMessage_AddModelInstalled:
if !worker.registed { if !worker.registed {
...@@ -631,6 +620,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -631,6 +620,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
"worker-addr": worker.workerAddr, "worker-addr": worker.workerAddr,
"model count": len(msg.AddModelInstalled.Models), "model count": len(msg.AddModelInstalled.Models),
}).Debugf("receive worker add model installed:%v", msg.AddModelInstalled.Models) }).Debugf("receive worker add model installed:%v", msg.AddModelInstalled.Models)
// todo: add worker installed model with model_id.
case *omanager.WorkerMessage_DelModelInstalled: case *omanager.WorkerMessage_DelModelInstalled:
if !worker.registed { if !worker.registed {
...@@ -640,6 +630,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -640,6 +630,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
"worker-addr": worker.workerAddr, "worker-addr": worker.workerAddr,
"model count": len(msg.DelModelInstalled.ModelIds), "model count": len(msg.DelModelInstalled.ModelIds),
}).Debugf("receive worker del model installed:%v", msg.DelModelInstalled.ModelIds) }).Debugf("receive worker del model installed:%v", msg.DelModelInstalled.ModelIds)
// todo: del worker installed model with model_id.
case *omanager.WorkerMessage_InstalledModelStatus: case *omanager.WorkerMessage_InstalledModelStatus:
if !worker.registed { if !worker.registed {
...@@ -650,6 +641,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -650,6 +641,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
"model": len(msg.InstalledModelStatus.ModelId), "model": len(msg.InstalledModelStatus.ModelId),
"type": "status", "type": "status",
}).Debugf("receive worker installed model status:%v", msg.InstalledModelStatus) }).Debugf("receive worker installed model status:%v", msg.InstalledModelStatus)
// todo: update worker installed model status.
case *omanager.WorkerMessage_RunningModelStatus: case *omanager.WorkerMessage_RunningModelStatus:
if !worker.registed { if !worker.registed {
...@@ -660,6 +652,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -660,6 +652,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
"model": len(msg.RunningModelStatus.ModelId), "model": len(msg.RunningModelStatus.ModelId),
"type": "status", "type": "status",
}).Debugf("receive worker running model status:%v", msg.RunningModelStatus) }).Debugf("receive worker running model status:%v", msg.RunningModelStatus)
// todo: update worker running model status.
case *omanager.WorkerMessage_GpuUsage: case *omanager.WorkerMessage_GpuUsage:
if !worker.registed { if !worker.registed {
...@@ -669,15 +662,17 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -669,15 +662,17 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
"worker-addr": worker.workerAddr, "worker-addr": worker.workerAddr,
"usage count": len(msg.GpuUsage.Usages), "usage count": len(msg.GpuUsage.Usages),
}).Debugf("receive worker gpu usage:%v", msg.GpuUsage.Usages) }).Debugf("receive worker gpu usage:%v", msg.GpuUsage.Usages)
// todo: update worker gpu usage info.
case *omanager.WorkerMessage_RegisteMessage: case *omanager.WorkerMessage_RegisteMessage:
// 1. do some verify.
if worker.registed { if worker.registed {
continue continue
} }
l.WithFields(log.Fields{ l.WithFields(log.Fields{
"worker-addr": worker.workerAddr, "worker-addr": worker.workerAddr,
}).Debug("receive registed message") }).Debug("receive registed message")
// todo: verify signature // 2. check signature.
info := msg.RegisteMessage.Info info := msg.RegisteMessage.Info
{ {
...@@ -695,20 +690,27 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -695,20 +690,27 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
worker.quit <- ErrInvalidMessageValue worker.quit <- ErrInvalidMessageValue
return return
} }
if time.Now().Unix()-int64(msg.RegisteMessage.Timestamp) > config.GetConfig().GetWorkerSignatureExpiredTime() {
l.WithFields(log.Fields{ }
"worker-addr": worker.workerAddr, // 3. check timestamp not expired.
}).Error("message signature expired") if time.Now().Unix()-int64(msg.RegisteMessage.Timestamp) > config.GetConfig().GetWorkerSignatureExpiredTime() {
worker.quit <- ErrExpiredMsgSignature l.WithFields(log.Fields{
return "worker-addr": worker.workerAddr,
} }).Error("message signature expired")
worker.quit <- ErrExpiredMsgSignature
return
} }
// 4. replace old connection.
if pubkey, err := utils.HexToPubkey(info.MinerPubkey); err != nil { if pubkey, err := utils.HexToPubkey(info.MinerPubkey); err != nil {
l.WithFields(log.Fields{ l.WithFields(log.Fields{
"worker-addr": worker.workerAddr, "worker-addr": worker.workerAddr,
"error": err, "error": err,
}).Error("parse pubkey failed") }).Error("parse pubkey failed")
worker.quit <- ErrInvalidMsgSignature
return
} else { } else {
addr := utils.PubkeyToAddress(pubkey) addr := utils.PubkeyToAddress(pubkey)
if old := wm.GetWorkerByAddr(addr); old != nil { if old := wm.GetWorkerByAddr(addr); old != nil {
...@@ -721,6 +723,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -721,6 +723,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
worker.workerAddr = addr worker.workerAddr = addr
} }
worker.registed = true worker.registed = true
// 5. check ip address.
matched, err := regexp.MatchString("((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})(\\.((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})){3}", matched, err := regexp.MatchString("((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})(\\.((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})){3}",
msg.RegisteMessage.Hardware.NET.Ip) msg.RegisteMessage.Hardware.NET.Ip)
if err != nil { if err != nil {
...@@ -735,14 +738,21 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -735,14 +738,21 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
Models: msg.RegisteMessage.Models, Models: msg.RegisteMessage.Models,
} }
wm.SetWorkerAddr(worker, worker.workerAddr) wm.SetWorkerAddr(worker, worker.workerAddr)
// check white list.
if err := wm.checkWhiteList(worker, info.BenefitAddress); err != nil { if err := wm.checkWhiteList(worker, info.BenefitAddress); err != nil {
worker.quit <- err worker.quit <- err
return return
} else { } else {
wm.addWorkerToSets(worker, info.BenefitAddress) wm.addWorkerToWhiteListSet(worker, info.BenefitAddress)
}
// add worker to mogo.
if err := wm.AddWorker(worker); err == nil {
worker.addFirstSucceed = true
wm.UpdateWorkerActive(worker)
} }
// start manage worker.
wreg := workerRegistry{ wreg := workerRegistry{
worker: worker, worker: worker,
wm: wm, wm: wm,
......
...@@ -87,7 +87,7 @@ func (w workerRegistry) DetailInfo() (json.RawMessage, error) { ...@@ -87,7 +87,7 @@ func (w workerRegistry) DetailInfo() (json.RawMessage, error) {
} }
info.HearBeat = w.wm.GetHeartBeat(w.worker.uuid) * 1000 // to ms info.HearBeat = w.wm.GetHeartBeat(w.worker.uuid) * 1000 // to ms
info.MinerAddress = w.worker.workerAddr info.MinerAddress = w.worker.workerAddr
info.Nonce = int64(w.worker.nonce) info.Nonce = 0
if w.worker.info.nodeInfo != nil { if w.worker.info.nodeInfo != nil {
info.CpuModel = w.worker.info.nodeInfo.Hardware.CPU.Model info.CpuModel = w.worker.info.nodeInfo.Hardware.CPU.Model
info.CpuCore = int(w.worker.info.nodeInfo.Hardware.CPU.Cores) info.CpuCore = int(w.worker.info.nodeInfo.Hardware.CPU.Cores)
......
...@@ -2,7 +2,6 @@ package server ...@@ -2,7 +2,6 @@ package server
import ( import (
"context" "context"
"encoding/hex"
"errors" "errors"
"fmt" "fmt"
"github.com/gomodule/redigo/redis" "github.com/gomodule/redigo/redis"
...@@ -10,80 +9,49 @@ import ( ...@@ -10,80 +9,49 @@ import (
"github.com/odysseus/mogo/operator" "github.com/odysseus/mogo/operator"
"github.com/odysseus/mogo/types" "github.com/odysseus/mogo/types"
"github.com/odysseus/nodemanager/config" "github.com/odysseus/nodemanager/config"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"go.mongodb.org/mongo-driver/mongo"
"strconv" "strconv"
"strings" "strings"
"time" "time"
) )
func (wm *WorkerManager) UpdateWorkerDeviceStatusInfo(worker *Worker, status []byte) { func (wm *WorkerManager) UpdateWorkerUsageInfo(worker *Worker, usageInfo *omanager.DeviceUsageResponse) {
wm.rdb.Set(context.Background(), workerDeviceStatusInfoKey(worker), status, 0) // todo: update usage info to mogo.
wm.rdb.Set(context.Background(), workerUsageInfoKey(worker), usageInfo.String(), 0)
} }
func (wm *WorkerManager) UpdateWorkerUsageInfo(worker *Worker, usageInfo string) { func (wm *WorkerManager) UpdateWorkerDeviceInfo(worker *Worker, deviceInfos *omanager.DeviceInfoMessage) {
wm.rdb.Set(context.Background(), workerUsageInfoKey(worker), usageInfo, 0) // todo: update device info to mogo.
} wm.rdb.Set(context.Background(), workerDeviceInfoKey(worker), deviceInfos.String(), 0)
func (wm *WorkerManager) UpdateWorkerDeviceInfo(worker *Worker, deviceInfos string) {
wm.rdb.Set(context.Background(), workerDeviceInfoKey(worker), deviceInfos, 0)
}
func (wm *WorkerManager) UpdateWorkerResourceInfo(worker *Worker, resourceInfo []byte) {
rstr := hex.EncodeToString(resourceInfo)
log.WithField("resourceinfo", rstr).Infof("update resourceinfo")
wm.rdb.Set(context.Background(), workerResourceInfoKey(worker), rstr, 0)
}
func (wm *WorkerManager) UpdateWorkerBootedResourceInfo(worker *Worker, bootedResourceInfo []byte) {
rstr := hex.EncodeToString(bootedResourceInfo)
log.WithField("resourceinfo", rstr).Infof("update resourceinfo")
wm.rdb.Set(context.Background(), workerBootedResourceInfoKey(worker), rstr, 0)
}
func (wm *WorkerManager) UpdateWorkerNonce(worker *Worker, nonce int) error {
return wm.rdb.Set(context.Background(), workerNonceKey(worker), nonce, 0).Err()
} }
func (wm *WorkerManager) GetWorkerNonce(worker *Worker) (int, error) { func (wm *WorkerManager) GetWorkerNonce(worker *Worker) (int, error) {
if worker.workerAddr != "" { return 0, nil
nonceK := workerNonceKey(worker)
nonce, err := wm.rdb.Get(context.Background(), nonceK).Int()
if err == redis.ErrNil {
nonce = 1
if err = wm.rdb.Set(context.Background(), nonceK, nonce, 0).Err(); err != nil {
return 0, err
}
}
return nonce, nil
}
return 0, errors.New("unkown worker node info")
} }
func (wm *WorkerManager) IncrWorkerNonce(worker *Worker) (int, error) { func (wm *WorkerManager) updateWorkerInfo(worker *Worker, winfo *operator.WorkerInfo) error {
nonce, err := wm.rdb.Incr(context.Background(), workerNonceKey(worker)).Uint64() // 2. update worker running info.
return int(nonce), err wm.workerRunningOperator.DeleteByWorkerId(context.Background(), worker.WorkerAccount().String())
} for _, running := range worker.info.nodeInfo.Models.RunningModels {
id, _ := strconv.Atoi(running.ModelId)
func (wm *WorkerManager) AddWorkerFirst(worker *Worker) error { iInfo := &operator.WorkerRunningInfo{
log.WithField("worker", worker.workerAddr).Info("add worker first time.") WorkerId: worker.WorkerAccount().String(),
wm.UpdateWorkerActive(worker) ModelId: id,
for _, gpu := range worker.info.nodeInfo.Hardware.GPU { ExecTime: int(running.ExecTime),
// add device to redis }
priority := 0 _, err := wm.workerRunningOperator.Insert(context.Background(), iInfo)
_ = gpu // todo: set priority with device info. if err != nil {
for m := 0; m < config.GetConfig().GetWorkerMultiple(); m++ { log.WithFields(log.Fields{
// add worker to redis queue "worker": worker.WorkerAccount().String(),
if err := wm.rdb.RPush(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(priority), workerUid(worker)).Err(); err != nil { "model": id,
continue }).WithError(err).Error("insert worker running model info failed")
} continue
} }
} }
_, err := wm.workerInfoOperator.InsertWorker(context.Background(), &operator.WorkerInfo{ // 3. update worker installed info.
WorkerId: worker.WorkerAccount().String(), wm.workerInstalledOperator.DeleteByWorkerId(context.Background(), worker.WorkerAccount().String())
NodeInfo: types.PbToNodeInfo(worker.info.nodeInfo.Info),
Models: types.PbToModelInfo(worker.info.nodeInfo.Models),
Hardware: types.PbToHardwareInfo(worker.info.nodeInfo.Hardware),
})
for _, installed := range worker.info.nodeInfo.Models.InstalledModels { for _, installed := range worker.info.nodeInfo.Models.InstalledModels {
id, _ := strconv.Atoi(installed.ModelId) id, _ := strconv.Atoi(installed.ModelId)
iInfo := &operator.WorkerInstalledInfo{ iInfo := &operator.WorkerInstalledInfo{
...@@ -93,88 +61,104 @@ func (wm *WorkerManager) AddWorkerFirst(worker *Worker) error { ...@@ -93,88 +61,104 @@ func (wm *WorkerManager) AddWorkerFirst(worker *Worker) error {
if len(worker.info.nodeInfo.Hardware.GPU) > 0 { if len(worker.info.nodeInfo.Hardware.GPU) > 0 {
iInfo.GpuFree = worker.info.nodeInfo.Hardware.GPU[0].MemFree iInfo.GpuFree = worker.info.nodeInfo.Hardware.GPU[0].MemFree
} }
wm.workerInstalledOperator.Insert(context.Background(), iInfo) _, err := wm.workerInstalledOperator.Insert(context.Background(), iInfo)
} if err != nil {
for _, running := range worker.info.nodeInfo.Models.RunningModels { log.WithFields(log.Fields{
id, _ := strconv.Atoi(running.ModelId) "worker": worker.WorkerAccount().String(),
iInfo := &operator.WorkerRunningInfo{ "model": id,
WorkerId: worker.WorkerAccount().String(), }).WithError(err).Error("insert worker installed model info failed")
ModelId: id, continue
ExecTime: int(running.ExecTime),
} }
wm.workerRunningOperator.Insert(context.Background(), iInfo)
}
}
// 1. update worker info.
winfo.Hardware = types.PbToHardwareInfo(worker.info.nodeInfo.Hardware)
winfo.Models = types.PbToModelInfo(worker.info.nodeInfo.Models)
winfo.NodeInfo = types.PbToNodeInfo(worker.info.nodeInfo.Info)
err := wm.workerInfoOperator.UpdateWorker(context.Background(), winfo)
if err != nil { if err != nil {
log.WithError(err).Error("insert worker info failed")
return err return err
} }
return nil return nil
} }
func (wm *WorkerManager) AddWorkerToQueue(worker *Worker) { func (wm *WorkerManager) addWorkerInfo(worker *Worker) error {
nonce, err := wm.GetWorkerNonce(worker) // 1. add worker info.
_, err := wm.workerInfoOperator.InsertWorker(context.Background(), &operator.WorkerInfo{
WorkerId: worker.WorkerAccount().String(),
NodeInfo: types.PbToNodeInfo(worker.info.nodeInfo.Info),
Models: types.PbToModelInfo(worker.info.nodeInfo.Models),
Hardware: types.PbToHardwareInfo(worker.info.nodeInfo.Hardware),
})
if err != nil { if err != nil {
log.WithField("worker-addr", worker.workerAddr).Error("get worker nonce failed when get device info") return err
} else { }
// if statekeys not exist, nonce don't change.
nmlist, err := wm.WorkerNmList(worker) // 2. add worker running info.
for _, running := range worker.info.nodeInfo.Models.RunningModels {
id, _ := strconv.Atoi(running.ModelId)
iInfo := &operator.WorkerRunningInfo{
WorkerId: worker.WorkerAccount().String(),
ModelId: id,
ExecTime: int(running.ExecTime),
}
_, err := wm.workerRunningOperator.Insert(context.Background(), iInfo)
if err != nil { if err != nil {
if err == redis.ErrNil { log.WithFields(log.Fields{
wm.UpdateWorkerActive(worker) "worker": worker.WorkerAccount().String(),
} "model": id,
} else { }).WithError(err).Error("insert worker running model info failed")
if len(nmlist) == 0 { continue
// if nmlist is empty, nonce incr.
nonce, err = wm.IncrWorkerNonce(worker)
if err != nil {
log.WithField("worker-addr", worker.workerAddr).Error("incr worker nonce failed when get device info")
}
} else {
// else if nmlist is not empty, clear and add self to it.
worker.nonce = nonce
wm.rdb.Del(context.Background(), workerStatusKey(worker))
wm.UpdateWorkerActive(worker)
}
} }
} }
if err == nil {
worker.nonce = nonce // 3. add worker installed info.
wm.AddWorkerFirst(worker) for _, installed := range worker.info.nodeInfo.Models.InstalledModels {
worker.addFirstSucceed = true id, _ := strconv.Atoi(installed.ModelId)
iInfo := &operator.WorkerInstalledInfo{
WorkerId: worker.WorkerAccount().String(),
ModelId: id,
}
if len(worker.info.nodeInfo.Hardware.GPU) > 0 {
iInfo.GpuFree = worker.info.nodeInfo.Hardware.GPU[0].MemFree
}
_, err := wm.workerInstalledOperator.Insert(context.Background(), iInfo)
if err != nil {
log.WithFields(log.Fields{
"worker": worker.WorkerAccount().String(),
"model": id,
}).WithError(err).Error("insert worker installed model info failed")
continue
}
} }
return nil
} }
func (wm *WorkerManager) AddWorkerSingle(worker *Worker) error { func (wm *WorkerManager) AddWorker(worker *Worker) error {
log.WithField("worker", worker.workerAddr).Info("add worker on back.") // 1. if worker is exist in mogo, update worker info.
wm.UpdateWorkerActive(worker) winfo, err := wm.workerInfoOperator.FindWorkerByWorkerId(context.Background(), worker.WorkerAccount().String())
{ if err != nil {
// add worker to redis queue if err == mongo.ErrNoDocuments {
priority := 0 // create a new
if err := wm.rdb.RPush(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(priority), workerUid(worker)).Err(); err != nil { return wm.addWorkerInfo(worker)
log.WithError(err).Error("add worker back to queue failed.")
} else { } else {
log.WithField("worker", worker.workerAddr).Info("add worker back to queue success.") log.WithError(err).Error("find worker info failed")
}
} else {
if winfo != nil {
// update worker info.
return wm.updateWorkerInfo(worker, winfo)
} }
} }
// add worker to redis queue return errors.New("can't replace worker info")
return nil
} }
func (wm *WorkerManager) UpdateWorkerActive(worker *Worker) { func (wm *WorkerManager) UpdateWorkerActive(worker *Worker) {
if !worker.online { if !worker.online {
return return
} }
nonce, err := wm.GetWorkerNonce(worker)
if err != nil {
return
}
if nonce != worker.nonce {
wm.InActiveWorker(worker)
worker.nonce = nonce
}
old := worker.latestNmValue old := worker.latestNmValue
if newNm, err := wm.activeWorker(worker); err != nil { if newNm, err := wm.activeWorker(worker); err != nil {
return return
...@@ -212,12 +196,9 @@ func (wm *WorkerManager) InActiveWorker(worker *Worker) { ...@@ -212,12 +196,9 @@ func (wm *WorkerManager) InActiveWorker(worker *Worker) {
if list, err := wm.rdb.SMembers(context.Background(), workerStatusKey(worker)).Result(); err == nil && len(list) == 0 { if list, err := wm.rdb.SMembers(context.Background(), workerStatusKey(worker)).Result(); err == nil && len(list) == 0 {
wm.rdb.Del(context.Background(), workerStatusKey(worker)) wm.rdb.Del(context.Background(), workerStatusKey(worker))
wm.rdb.Del(context.Background(), workerUsageInfoKey(worker))
wm.rdb.Del(context.Background(), workerDeviceInfoKey(worker))
wm.rdb.Del(context.Background(), workerResourceInfoKey(worker))
wm.rdb.Del(context.Background(), workerBootedResourceInfoKey(worker))
if worker.info.nodeInfo != nil { if worker.info.nodeInfo != nil {
wm.rmWorkerFromSets(worker, worker.info.nodeInfo.Info.BenefitAddress) wm.delWorkerFromWhiteListSet(worker, worker.info.nodeInfo.Info.BenefitAddress)
// delete worker info from mogo.
n, err := wm.workerInfoOperator.DeleteByWorkerId(context.Background(), worker.WorkerAccount().String()) n, err := wm.workerInfoOperator.DeleteByWorkerId(context.Background(), worker.WorkerAccount().String())
if err != nil { if err != nil {
log.WithError(err).Error("delete worker info failed") log.WithError(err).Error("delete worker info failed")
...@@ -227,15 +208,15 @@ func (wm *WorkerManager) InActiveWorker(worker *Worker) { ...@@ -227,15 +208,15 @@ func (wm *WorkerManager) InActiveWorker(worker *Worker) {
} }
} }
func (wm *WorkerManager) addWorkerToSets(worker *Worker, benefit string) { func (wm *WorkerManager) addWorkerToWhiteListSet(worker *Worker, benefit string) {
wm.rdb.SAdd(context.Background(), workerSetsKey(benefit), worker.workerAddr) wm.rdb.SAdd(context.Background(), workerSetsKey(benefit), worker.workerAddr)
} }
func (wm *WorkerManager) rmWorkerFromSets(worker *Worker, benefit string) { func (wm *WorkerManager) delWorkerFromWhiteListSet(worker *Worker, benefit string) {
wm.rdb.SRem(context.Background(), workerSetsKey(benefit), worker.workerAddr) wm.rdb.SRem(context.Background(), workerSetsKey(benefit), worker.workerAddr)
} }
func (wm *WorkerManager) getWorkerSets(benefit string) ([]string, error) { func (wm *WorkerManager) getWorkerWhiteListSetByBenefit(benefit string) ([]string, error) {
list, err := wm.rdb.SMembers(context.Background(), workerSetsKey(benefit)).Result() list, err := wm.rdb.SMembers(context.Background(), workerSetsKey(benefit)).Result()
if err == redis.ErrNil { if err == redis.ErrNil {
return []string{}, nil return []string{}, nil
...@@ -261,7 +242,7 @@ func (wm *WorkerManager) checkWhiteList(worker *Worker, benefit string) error { ...@@ -261,7 +242,7 @@ func (wm *WorkerManager) checkWhiteList(worker *Worker, benefit string) error {
return errors.New("not in white list") return errors.New("not in white list")
} }
maxNodeCount := wh.NodeNum maxNodeCount := wh.NodeNum
nodeList, err := wm.getWorkerSets(benefit) nodeList, err := wm.getWorkerWhiteListSetByBenefit(benefit)
if err != nil { if err != nil {
return errors.New("check worker white list failed") return errors.New("check worker white list failed")
} }
...@@ -299,10 +280,6 @@ func workerDeviceStatusInfoKey(w *Worker) string { ...@@ -299,10 +280,6 @@ func workerDeviceStatusInfoKey(w *Worker) string {
return config.WORKER_DEVICE_STATUS_PREFIX + w.workerAddr return config.WORKER_DEVICE_STATUS_PREFIX + w.workerAddr
} }
func workerNonceKey(w *Worker) string {
return config.WORKER_NONCE_KEY_PREFIX + w.workerAddr
}
func workerSetsKey(benefit string) string { func workerSetsKey(benefit string) string {
return config.WORKER_SETS_PREFIX + benefit return config.WORKER_SETS_PREFIX + benefit
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment