Commit 46016843 authored by vicotor's avatar vicotor

fix bug

parent 1834d60e
...@@ -17,6 +17,10 @@ type Distributor struct { ...@@ -17,6 +17,10 @@ type Distributor struct {
func StartDistributor(manager WorkerManager) *Distributor { func StartDistributor(manager WorkerManager) *Distributor {
infos, _ := getModelInfo(config.GetConfig().GetModelInfoUrl()) infos, _ := getModelInfo(config.GetConfig().GetModelInfoUrl())
return interStartDistributor(manager, infos)
}
func interStartDistributor(manager WorkerManager, infos []ModelDetailInfo) *Distributor {
lib := NewHeapModelInfos(infos) lib := NewHeapModelInfos(infos)
dis := &Distributor{ dis := &Distributor{
modelLib: lib, modelLib: lib,
...@@ -31,7 +35,7 @@ func (d *Distributor) AddWorker(id int64, addr string, info *omanager.NodeInfoRe ...@@ -31,7 +35,7 @@ func (d *Distributor) AddWorker(id int64, addr string, info *omanager.NodeInfoRe
if _, exist := d.workers.Load(id); exist { if _, exist := d.workers.Load(id); exist {
return return
} }
worker := NewImageWorker(addr, d.modelLib, d.manager) worker := NewImageWorker(addr, d.modelLib, d.manager, info)
d.workers.Store(info.Info.MinerPubkey, worker) d.workers.Store(info.Info.MinerPubkey, worker)
go worker.DistributeImages() go worker.DistributeImages()
} }
......
...@@ -2,11 +2,48 @@ package distribute ...@@ -2,11 +2,48 @@ package distribute
import ( import (
"fmt" "fmt"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
"sync"
"testing" "testing"
"time"
) )
var (
modelUrl = "http://18.167.103.232/admin/api/task/taskheat"
)
type demoWorkerManager struct {
workers map[string]*omanager.NodeInfoResponse
mux sync.Mutex
}
func (m *demoWorkerManager) WorkerCount() int {
m.mux.Lock()
defer m.mux.Unlock()
return len(m.workers)
}
func (m *demoWorkerManager) addWorker(addr string, info *omanager.NodeInfoResponse) {
m.mux.Lock()
defer m.mux.Unlock()
m.workers[addr] = info
}
func (m *demoWorkerManager) ModelOperate(addr string, operate []*omanager.ModelOperate) error {
for _, op := range operate {
fmt.Printf("addr: %s - operate: %v\n", addr, op)
}
return nil
}
func generateWorkerManager() *demoWorkerManager {
return &demoWorkerManager{
workers: make(map[string]*omanager.NodeInfoResponse),
}
}
func TestGetJson(t *testing.T) { func TestGetJson(t *testing.T) {
infos, err := getModelInfo("http://18.167.103.232/admin/api/task/taskheat") infos, err := getModelInfo(modelUrl)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
...@@ -14,3 +51,65 @@ func TestGetJson(t *testing.T) { ...@@ -14,3 +51,65 @@ func TestGetJson(t *testing.T) {
fmt.Printf("info.id = %d\n", info.TaskID) fmt.Printf("info.id = %d\n", info.TaskID)
} }
} }
func generateAWorker() (string, *omanager.NodeInfoResponse) {
addr := "0x4588Fa9951D44Ffd7Dd783885c83d34694fc5647"
info := &omanager.NodeInfoResponse{
Info: &omanager.NodeInfo{
MinerPubkey: "f41184d231af9eab0338a9deee5542c6efc0c18b99a0cfe6877e16fe453168c728164e4176dd1ff358c43ef2b87ed64e198876c8756dd35abeeddc375300c948",
BenefitAddress: "0xeed5fd3046fa0a33f527e8ade3ae13e80629d436",
},
Models: &omanager.ModelsInfo{
InstalledModels: []*omanager.InstalledModel{},
RunningModels: []*omanager.RunningModel{},
WaitToInstallModels: []*omanager.WaitToInstallModel{},
},
Hardware: &omanager.HardwareInfo{
DISK: &omanager.DiskInfo{
Total: 10 * 1024 * 1024 * 1024 * 1024,
Free: 800,
//Free: 8 * 1024 * 1024 * 1024 * 1024,
},
RAM: &omanager.MemoryInfo{
Total: 128 * 1024 * 1024 * 1024,
Free: 100 * 1024 * 1024 * 1024,
},
CPU: &omanager.CPUInfo{
Model: "Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz",
Number: 1,
Cores: 8,
Threads: 16,
Usage: 10,
},
GPU: []*omanager.GPUInfo{
&omanager.GPUInfo{
Seq: 0,
Uuid: "111000",
Model: "Tesla V100-SXM2-16GB",
Performance: 10000,
PowerRating: 1000,
PowerRt: 100,
MemTotal: 16 * 1024 * 1024 * 1024,
MemFree: 16 * 1024 * 1024 * 1024,
Usage: 0,
Temp: 20,
},
},
},
}
return addr, info
}
func TestStartDistributor(t *testing.T) {
manager := generateWorkerManager()
infos, err := getModelInfo(modelUrl)
if err != nil {
t.Error(err)
return
}
d := interStartDistributor(manager, infos)
addr, info := generateAWorker()
manager.addWorker(addr, info)
d.AddWorker(0, addr, info)
time.Sleep(time.Hour * 10)
}
package modellibrary
type modelLibrary struct {
}
// modelLibrary to compute model type, categories (new, hot, cold, etc.), and other model-related information.
func NewModelLibrary() *modelLibrary {
return &modelLibrary{}
}
func (m *modelLibrary) UpdateModelInfo() {
}
func (m *modelLibrary) FindModel() {
}
...@@ -19,9 +19,10 @@ type imageWorker struct { ...@@ -19,9 +19,10 @@ type imageWorker struct {
quit chan struct{} quit chan struct{}
} }
func NewImageWorker(addr string, modellib ModelLibrary, manager WorkerManager) *imageWorker { func NewImageWorker(addr string, modellib ModelLibrary, manager WorkerManager, info *omanager.NodeInfoResponse) *imageWorker {
return &imageWorker{ return &imageWorker{
addr: addr, addr: addr,
info: info,
modelLibrary: modellib, modelLibrary: modellib,
manager: manager, manager: manager,
quit: make(chan struct{}), quit: make(chan struct{}),
...@@ -218,8 +219,9 @@ func (w *imageWorker) distributeMatch(mode DistributeMode, model ModelDetailInfo ...@@ -218,8 +219,9 @@ func (w *imageWorker) distributeMatch(mode DistributeMode, model ModelDetailInfo
return false return false
} }
func (w *imageWorker) distributeToUnstall(models SortedModelDetailInfos, info omanager.NodeInfoResponse, ops []*omanager.ModelOperate) { func (w *imageWorker) distributeToUnstall(models SortedModelDetailInfos, info omanager.NodeInfoResponse) []*omanager.ModelOperate {
lib := w.modelLibrary lib := w.modelLibrary
var ops = make([]*omanager.ModelOperate, 0)
for _, model := range info.Models.InstalledModels { for _, model := range info.Models.InstalledModels {
id, _ := strconv.Atoi(model.ModelId) id, _ := strconv.Atoi(model.ModelId)
// if the model is not in the model library or the worker can't install the model, then uninstall it. // if the model is not in the model library or the worker can't install the model, then uninstall it.
...@@ -227,16 +229,18 @@ func (w *imageWorker) distributeToUnstall(models SortedModelDetailInfos, info om ...@@ -227,16 +229,18 @@ func (w *imageWorker) distributeToUnstall(models SortedModelDetailInfos, info om
ops = append(ops, w.getOp(find, omanager.ModelOperateType_DELETE)) ops = append(ops, w.getOp(find, omanager.ModelOperateType_DELETE))
} }
} }
return ops
} }
func (w *imageWorker) distributeToInstall(models SortedModelDetailInfos, info omanager.NodeInfoResponse, ops []*omanager.ModelOperate) { func (w *imageWorker) distributeToInstall(models SortedModelDetailInfos, info omanager.NodeInfoResponse) []*omanager.ModelOperate {
ops := make([]*omanager.ModelOperate, 0)
totalWorker := w.manager.WorkerCount() totalWorker := w.manager.WorkerCount()
hash := sha3.Sum256([]byte(info.Info.MinerPubkey)) hash := sha3.Sum256([]byte(info.Info.MinerPubkey))
mode := GreedyMode // 贪婪模式 mode := GreedyMode // 贪婪模式
if totalWorker > 10 { if totalWorker < 10 {
mode = HashingMode // 散列模式 mode = HashingMode // 散列模式
} }
pendingDiskToUsed := int64(0) pendingDiskToUsed := int64(0)
...@@ -247,14 +251,27 @@ func (w *imageWorker) distributeToInstall(models SortedModelDetailInfos, info om ...@@ -247,14 +251,27 @@ func (w *imageWorker) distributeToInstall(models SortedModelDetailInfos, info om
ops = append(ops, w.getOp(model, omanager.ModelOperateType_INSTALL)) ops = append(ops, w.getOp(model, omanager.ModelOperateType_INSTALL))
} }
} }
return ops
} }
func (w *imageWorker) distributeToRun(models SortedModelDetailInfos, info omanager.NodeInfoResponse, ops []*omanager.ModelOperate) { func (w *imageWorker) distributeToRun(models SortedModelDetailInfos, info omanager.NodeInfoResponse) []*omanager.ModelOperate {
ops := make([]*omanager.ModelOperate, 0)
for _, model := range models {
if w.IsInstalled(model.TaskID) && !w.IsRunning(model.TaskID) && w.CanRun(model) {
ops = append(ops, w.getOp(model, omanager.ModelOperateType_RUN))
}
}
return ops
} }
func (w *imageWorker) distributeToStopRun(models SortedModelDetailInfos, info omanager.NodeInfoResponse, ops []*omanager.ModelOperate) { func (w *imageWorker) distributeToStopRun(models SortedModelDetailInfos, info omanager.NodeInfoResponse) []*omanager.ModelOperate {
ops := make([]*omanager.ModelOperate, 0)
for _, model := range models {
if w.IsRunning(model.TaskID) && !w.CanForceRun(model) {
ops = append(ops, w.getOp(model, omanager.ModelOperateType_STOP))
}
}
return ops
} }
func (w *imageWorker) distribute() { func (w *imageWorker) distribute() {
...@@ -264,9 +281,11 @@ func (w *imageWorker) distribute() { ...@@ -264,9 +281,11 @@ func (w *imageWorker) distribute() {
info := w.getInfo() info := w.getInfo()
operates := make([]*omanager.ModelOperate, 0) operates := make([]*omanager.ModelOperate, 0)
w.distributeToUnstall(models, info, operates) operates = append(operates, w.distributeToUnstall(models, info)...)
w.distributeToInstall(models, info, operates) operates = append(operates, w.distributeToInstall(models, info)...)
w.distributeToRun(models, info, operates) operates = append(operates, w.distributeToRun(models, info)...)
w.distributeToStopRun(models, info, operates) operates = append(operates, w.distributeToStopRun(models, info)...)
w.manager.ModelOperate(w.addr, operates)
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment