Commit 61d3804f authored by vicotor's avatar vicotor

update distribute model to install

parent f477de55
...@@ -27,11 +27,11 @@ func StartDistributor(manager WorkerManager) *Distributor { ...@@ -27,11 +27,11 @@ func StartDistributor(manager WorkerManager) *Distributor {
return dis return dis
} }
func (d *Distributor) AddWorker(id int64, info *omanager.NodeInfoResponse) { func (d *Distributor) AddWorker(id int64, addr string, info *omanager.NodeInfoResponse) {
if _, exist := d.workers.Load(id); exist { if _, exist := d.workers.Load(id); exist {
return return
} }
worker := NewImageWorker(d.modelLib, d.manager) worker := NewImageWorker(addr, d.modelLib, d.manager)
d.workers.Store(info.Info.MinerPubkey, worker) d.workers.Store(info.Info.MinerPubkey, worker)
go worker.DistributeImages() go worker.DistributeImages()
} }
......
package distribute package distribute
import omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
type WorkerManager interface { type WorkerManager interface {
WorkerCount() int WorkerCount() int
ModelOperate(addr string, operate []*omanager.ModelOperate) error
} }
type ModelLibrary interface { type ModelLibrary interface {
......
package distribute package distribute
import ( import (
"encoding/json"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2" omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
"golang.org/x/crypto/sha3" "golang.org/x/crypto/sha3"
"sort" "sort"
...@@ -10,6 +11,7 @@ import ( ...@@ -10,6 +11,7 @@ import (
) )
type imageWorker struct { type imageWorker struct {
addr string
modelLibrary ModelLibrary modelLibrary ModelLibrary
manager WorkerManager manager WorkerManager
mux sync.Mutex mux sync.Mutex
...@@ -17,8 +19,9 @@ type imageWorker struct { ...@@ -17,8 +19,9 @@ type imageWorker struct {
quit chan struct{} quit chan struct{}
} }
func NewImageWorker(modellib ModelLibrary, manager WorkerManager) *imageWorker { func NewImageWorker(addr string, modellib ModelLibrary, manager WorkerManager) *imageWorker {
return &imageWorker{ return &imageWorker{
addr: addr,
modelLibrary: modellib, modelLibrary: modellib,
manager: manager, manager: manager,
quit: make(chan struct{}), quit: make(chan struct{}),
...@@ -175,29 +178,26 @@ const ( ...@@ -175,29 +178,26 @@ const (
HashingMode // 从热度由高到低,选择相匹配的模型进行安装 HashingMode // 从热度由高到低,选择相匹配的模型进行安装
) )
func (w *imageWorker) distribute() { func (w *imageWorker) getOp(model ModelDetailInfo, opType omanager.ModelOperateType) *omanager.ModelOperate {
models := w.modelLibrary.AllModel() cmdstr, _ := json.MarshalIndent(model.Cmd, "", " ")
sort.Sort(models) op := new(omanager.ModelOperate)
op.Operate = opType
op.ModelId = strconv.FormatInt(int64(model.TaskID), 10)
op.ImageName = model.ImageName
op.Username = ""
op.Password = ""
op.Cmd = string(cmdstr)
op.GpuSeq = 0
info := w.getInfo() return op
}
lib := w.modelLibrary func (w *imageWorker) distributeMatch(mode DistributeMode, model ModelDetailInfo, hash []byte) bool {
totalWorker := w.manager.WorkerCount()
mode := GreedyMode
// 贪婪模式
if totalWorker > 10 {
mode = HashingMode
}
// 散列模式
for _, model := range models {
if mode == GreedyMode { if mode == GreedyMode {
if w.CanInstall(model) { return true
// todo: quest worker to install the model.
}
} }
if mode == HashingMode { if mode == HashingMode {
hash := sha3.Sum256([]byte(info.Info.MinerPubkey)) level := w.modelLibrary.GetModelUsedLevel(model.TaskID)
level := lib.GetModelUsedLevel(model.TaskID)
weights := 0 weights := 0
switch level { switch level {
case ModelUsedLevelSuperLow: case ModelUsedLevelSuperLow:
...@@ -213,11 +213,50 @@ func (w *imageWorker) distribute() { ...@@ -213,11 +213,50 @@ func (w *imageWorker) distribute() {
case ModelUsedLevelSuperHigh: case ModelUsedLevelSuperHigh:
weights += 80 weights += 80
} }
if int(hash[0]) < (255 * weights / 100) { return int(hash[0]) < (255 * weights / 100)
if w.CanInstall(model) {
// todo: quest worker to install the model.
} }
return false
}
func (w *imageWorker) distributeToUnstall(models SortedModelDetailInfos, info omanager.NodeInfoResponse, ops []*omanager.ModelOperate) {
}
func (w *imageWorker) distributeToInstall(models SortedModelDetailInfos, info omanager.NodeInfoResponse, ops []*omanager.ModelOperate) {
totalWorker := w.manager.WorkerCount()
hash := sha3.Sum256([]byte(info.Info.MinerPubkey))
mode := GreedyMode // 贪婪模式
if totalWorker > 10 {
mode = HashingMode // 散列模式
} }
for _, model := range models {
if w.CanInstall(model) && w.distributeMatch(mode, model, hash[:]) {
ops = append(ops, w.getOp(model, omanager.ModelOperateType_INSTALL))
} }
} }
} }
func (w *imageWorker) distributeToRun(models SortedModelDetailInfos, info omanager.NodeInfoResponse, ops []*omanager.ModelOperate) {
}
func (w *imageWorker) distributeToStopRun(models SortedModelDetailInfos, info omanager.NodeInfoResponse, ops []*omanager.ModelOperate) {
}
func (w *imageWorker) distribute() {
models := w.modelLibrary.AllModel()
sort.Sort(models)
info := w.getInfo()
operates := make([]*omanager.ModelOperate, 0)
w.distributeToUnstall(models, info, operates)
w.distributeToInstall(models, info, operates)
w.distributeToRun(models, info, operates)
w.distributeToStopRun(models, info, operates)
}
...@@ -254,10 +254,17 @@ func (w *Worker) doDeviceUsage(msg *omanager.WorkerMessage_DeviceUsage) error { ...@@ -254,10 +254,17 @@ func (w *Worker) doDeviceUsage(msg *omanager.WorkerMessage_DeviceUsage) error {
"worker-addr": w.workerAddr, "worker-addr": w.workerAddr,
}).Debugf("receive worker device usage:%v", msg.DeviceUsage.Usage) }).Debugf("receive worker device usage:%v", msg.DeviceUsage.Usage)
// 1. update local cache
w.info.Hardware.DISK.Free = int64(msg.DeviceUsage.Usage.DiskUsage)
w.info.Hardware.RAM.Free = int64(msg.DeviceUsage.Usage.RamUsage)
w.info.Hardware.CPU.Usage = int32(msg.DeviceUsage.Usage.CpuUsage)
w.info.Hardware.NET.Bandwidth = int32(msg.DeviceUsage.Usage.NetBandwidth)
if !w.registed { if !w.registed {
return nil return nil
} }
// 1. update usage to hardware mogo.
// 2. update usage to hardware mogo.
return w.infoOp.UpdateHardwareUsage(context.TODO(), w.workerAddr, types.PbToDeviceUsage(msg.DeviceUsage.Usage)) return w.infoOp.UpdateHardwareUsage(context.TODO(), w.workerAddr, types.PbToDeviceUsage(msg.DeviceUsage.Usage))
} }
...@@ -331,6 +338,30 @@ func (w *Worker) doAddInstalledModel(msg *omanager.WorkerMessage_AddModelInstall ...@@ -331,6 +338,30 @@ func (w *Worker) doAddInstalledModel(msg *omanager.WorkerMessage_AddModelInstall
if !w.registed { if !w.registed {
return return
} }
// 0. update local cache.
for _, model := range msg.AddModelInstalled.Models {
if w.info.Models.InstalledModels == nil {
w.info.Models.InstalledModels = make([]*omanager.InstalledModel, 0)
}
// if model already installed, skip.
exist := false
for _, installed := range w.info.Models.InstalledModels {
if installed.ModelId == model.ModelId {
exist = true
break
}
}
if !exist {
w.info.Models.InstalledModels = append(w.info.Models.InstalledModels, model)
}
// remove from wait to install.
for idx, waitted := range w.info.Models.WaitToInstallModels {
if waitted.ModelId == model.ModelId {
w.info.Models.WaitToInstallModels = append(w.info.Models.WaitToInstallModels[:idx], w.info.Models.WaitToInstallModels[idx+1:]...)
break
}
}
}
models := make([]*types.InstalledModel, 0) models := make([]*types.InstalledModel, 0)
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"worker-addr": w.workerAddr, "worker-addr": w.workerAddr,
...@@ -377,6 +408,17 @@ func (w *Worker) doRemoveInstalledModel(msg *omanager.WorkerMessage_DelModelInst ...@@ -377,6 +408,17 @@ func (w *Worker) doRemoveInstalledModel(msg *omanager.WorkerMessage_DelModelInst
//if !w.registed { //if !w.registed {
// return // return
//} //}
// 0. update local cache.
for _, model := range msg.DelModelInstalled.ModelIds {
// remove from installed.
for idx, installed := range w.info.Models.InstalledModels {
if installed.ModelId == model {
w.info.Models.InstalledModels = append(w.info.Models.InstalledModels[:idx], w.info.Models.InstalledModels[idx+1:]...)
break
}
}
}
models := make([]int, 0) models := make([]int, 0)
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"worker-addr": w.workerAddr, "worker-addr": w.workerAddr,
...@@ -405,6 +447,16 @@ func (w *Worker) doInstalledModelStatus(msg *omanager.WorkerMessage_InstalledMod ...@@ -405,6 +447,16 @@ func (w *Worker) doInstalledModelStatus(msg *omanager.WorkerMessage_InstalledMod
//if !w.registed { //if !w.registed {
// return // return
//} //}
// 0. update local cache.
for _, installed := range w.info.Models.InstalledModels {
if installed.ModelId == msg.InstalledModelStatus.ModelId {
installed.LastRunTime = msg.InstalledModelStatus.LastRunTime
break
}
}
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"worker-addr": w.workerAddr, "worker-addr": w.workerAddr,
}).Debugf("receive worker installed model status:%v", msg.InstalledModelStatus) }).Debugf("receive worker installed model status:%v", msg.InstalledModelStatus)
...@@ -420,6 +472,16 @@ func (w *Worker) doRunningModelStatus(msg *omanager.WorkerMessage_RunningModelSt ...@@ -420,6 +472,16 @@ func (w *Worker) doRunningModelStatus(msg *omanager.WorkerMessage_RunningModelSt
//if !w.registed { //if !w.registed {
// return // return
//} //}
// 0. update local cache.
for _, running := range w.info.Models.RunningModels {
if running.ModelId == msg.RunningModelStatus.ModelId {
running.LastWorkTime = msg.RunningModelStatus.LastWorkTime
running.TotalRunCount = int32(msg.RunningModelStatus.TotalRunCount)
running.ExecTime = int32(msg.RunningModelStatus.ExecTime)
break
}
}
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"worker-addr": w.workerAddr, "worker-addr": w.workerAddr,
}).Debugf("receive worker running model status:%v", msg.RunningModelStatus) }).Debugf("receive worker running model status:%v", msg.RunningModelStatus)
...@@ -492,20 +554,22 @@ func (w *Worker) getMaxGPUFree() *omanager.GPUInfo { ...@@ -492,20 +554,22 @@ func (w *Worker) getMaxGPUFree() *omanager.GPUInfo {
return maxGpuFree return maxGpuFree
} }
func (w *Worker) ModelOperate(info interface{}, operate omanager.ModelOperateType) *omanager.ManagerMessage_ModelOperateRequest { func (w *Worker) ModelOperate(operators []*omanager.ModelOperate) error {
request := &omanager.ManagerMessage_ModelOperateRequest{ msg := &omanager.ManagerMessage{
Message: &omanager.ManagerMessage_ModelOperateRequest{
ModelOperateRequest: &omanager.ModelOperateRequest{ ModelOperateRequest: &omanager.ModelOperateRequest{
ModelOperates: []*omanager.ModelOperate{ ModelOperates: operators,
{
ModelId: "",
ImageName: "",
Username: "",
Password: "",
Cmd: "",
Operate: operate,
},
}, },
}, },
} }
return request var err error
send := make(chan bool)
callback := func(e error) bool {
err = e
close(send)
return true
}
w.SendToWorker(msg, callback)
<-send
return err
} }
...@@ -605,7 +605,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -605,7 +605,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
if e := wm.setWorkerLastTaskTime(worker, time.Now().Unix()); e != nil { if e := wm.setWorkerLastTaskTime(worker, time.Now().Unix()); e != nil {
log.WithField("worker", worker.uuid).WithError(e).Error("set worker last task time failed") log.WithField("worker", worker.uuid).WithError(e).Error("set worker last task time failed")
} }
wm.distributor.AddWorker(worker.uuid, worker.info) wm.distributor.AddWorker(worker.uuid, worker.workerAddr, worker.info)
default: default:
l.WithField("worker-addr", worker.workerAddr).Error(fmt.Sprintf("unsupport msg type %T", msg)) l.WithField("worker-addr", worker.workerAddr).Error(fmt.Sprintf("unsupport msg type %T", msg))
...@@ -678,3 +678,11 @@ func (wm *WorkerManager) makeTaskProof(worker *Worker, task *odysseus.TaskConten ...@@ -678,3 +678,11 @@ func (wm *WorkerManager) makeTaskProof(worker *Worker, task *odysseus.TaskConten
} }
return proof return proof
} }
func (wm *WorkerManager) ModelOperate(addr string, operate []*omanager.ModelOperate) error {
worker := wm.GetWorkerByAddr(addr)
if worker == nil {
return errors.New("worker not found")
}
return worker.ModelOperate(operate)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment