Commit f477de55 authored by vicotor's avatar vicotor

update distribute image

parent 3b9c91f2
...@@ -60,6 +60,7 @@ type Config struct { ...@@ -60,6 +60,7 @@ type Config struct {
Tickers TickerConfig `json:"ticker" toml:"ticker"` Tickers TickerConfig `json:"ticker" toml:"ticker"`
Kafka KafkaConfig `json:"kafka" toml:"kafka"` Kafka KafkaConfig `json:"kafka" toml:"kafka"`
Mongo MongoDbConfig `json:"mongodb" toml:"mongodb"` Mongo MongoDbConfig `json:"mongodb" toml:"mongodb"`
ModelInfoUrl string `json:"model_info_url" toml:"model_info_url"`
} }
var _cfg *Config = nil var _cfg *Config = nil
...@@ -115,3 +116,7 @@ func (conf *Config) GetWorkerSignatureExpiredTime() int64 { ...@@ -115,3 +116,7 @@ func (conf *Config) GetWorkerSignatureExpiredTime() int64 {
} }
return conf.WorkerSignatureExpiredTime return conf.WorkerSignatureExpiredTime
} }
func (conf *Config) GetModelInfoUrl() string {
return conf.ModelInfoUrl
}
package distribute package distribute
type Distribute struct { import (
workerList []IWorker "encoding/json"
existing map[string]IWorker "github.com/odysseus/nodemanager/config"
modelList map[string]ModelDetailInfo omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
"net/http"
"sync"
"time"
)
type Distributor struct {
workers sync.Map
modelLib ModelLibrary
manager WorkerManager
}
func StartDistributor(manager WorkerManager) *Distributor {
infos, _ := getModelInfo(config.GetConfig().GetModelInfoUrl())
lib := NewHeapModelInfos(infos)
dis := &Distributor{
modelLib: lib,
manager: manager,
}
go dis.loop()
return dis
}
func (d *Distributor) AddWorker(id int64, info *omanager.NodeInfoResponse) {
if _, exist := d.workers.Load(id); exist {
return
}
worker := NewImageWorker(d.modelLib, d.manager)
d.workers.Store(info.Info.MinerPubkey, worker)
go worker.DistributeImages()
} }
func NewDistribute() *Distribute { func (d *Distributor) RemoveWorker(id int64) {
return &Distribute{ if w, exist := d.workers.Load(id); exist {
workerList: make([]IWorker, 1000), w.(*imageWorker).Exit()
existing: make(map[string]IWorker),
} }
d.workers.Delete(id)
} }
func (d *Distribute) AttachWorker(w IWorker) bool { func (d *Distributor) UpdateWorkerInfo(id int64, info *omanager.NodeInfoResponse) {
if _, ok := d.existing[w.Address()]; ok { if w, exist := d.workers.Load(id); exist {
return false w.(*imageWorker).UpdateInfo(info)
} }
d.existing[w.Address()] = w
return true
} }
func (d *Distribute) DetachWorker(w IWorker) bool { func (d *Distributor) loop() {
if _, ok := d.existing[w.Address()]; !ok { tc := time.NewTicker(time.Minute * 10)
return false defer tc.Stop()
for {
select {
case <-tc.C:
infos, _ := getModelInfo(config.GetConfig().GetModelInfoUrl())
d.modelLib.UpdateModelInfo(infos)
}
} }
delete(d.existing, w.Address())
return true
} }
func (d *Distribute) schedulerAWorker() { var (
hclient = &http.Client{Timeout: 10 * time.Second}
)
func getModelInfo(url string) ([]ModelDetailInfo, error) {
r, err := hclient.Get(url)
if err != nil {
return []ModelDetailInfo{}, err
}
defer r.Body.Close()
data := struct {
Infos []ModelDetailInfo `json:"data"`
}{}
if err := json.NewDecoder(r.Body).Decode(&data); err != nil {
return []ModelDetailInfo{}, err
}
return data.Infos, nil
} }
package distribute
import (
"fmt"
"testing"
)
func TestGetJson(t *testing.T) {
infos, err := getModelInfo("http://18.167.103.232/admin/api/task/taskheat")
if err != nil {
t.Error(err)
}
for _, info := range infos {
fmt.Printf("info.id = %d\n", info.TaskID)
}
}
package distribute package distribute
type GpuInfo struct {
Performance int64
Mem int64
}
type IWorker interface {
DiskReverse() int64
Address() string
GPUInfo() []GpuInfo
}
type WorkerManager interface { type WorkerManager interface {
WorkerCount() int WorkerCount() int
} }
type ModelLibrary interface { type ModelLibrary interface {
UpdateModelInfo([]ModelDetailInfo)
FindModel(int) ModelDetailInfo FindModel(int) ModelDetailInfo
FindModelByName(string) ModelDetailInfo FindModelByName(string) ModelDetailInfo
InstalledWorkerCount(int) int InstalledWorkerCount(int) int
AllModel() SortedModelDetailInfos AllModel() SortedModelDetailInfos
GetModelUsedLevel(info ModelDetailInfo) ModelUsedLevel GetModelUsedLevel(int) ModelUsedLevel
} }
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"math/big" "math/big"
"sort" "sort"
"strconv"
"sync" "sync"
"time" "time"
) )
...@@ -40,11 +41,29 @@ func getModelLevel(count int, total int) ModelUsedLevel { ...@@ -40,11 +41,29 @@ func getModelLevel(count int, total int) ModelUsedLevel {
} }
type HardwareRequireInfo struct { type HardwareRequireInfo struct {
DiskSize int64 `json:"disk_size"` DiskSize string `json:"disk_size"`
Gpus []struct { Gpus []struct {
Gpu int64 `json:"gpu"` Gpu string `json:"gpu"`
} }
MemorySize int64 `json:"memory_size"` MemorySize string `json:"memory_size"`
}
func (h HardwareRequireInfo) IntDiskSize() int64 {
size, _ := strconv.ParseInt(h.DiskSize, 10, 64)
return size
}
func (h HardwareRequireInfo) IntMemorySize() int64 {
size, _ := strconv.ParseInt(h.MemorySize, 10, 64)
return size
}
func (h HardwareRequireInfo) IntGpu(idx int) int {
if idx < len(h.Gpus) {
gpu, _ := strconv.Atoi(h.Gpus[idx].Gpu)
return gpu
}
return 0
} }
type ModelDetailInfo struct { type ModelDetailInfo struct {
...@@ -89,6 +108,10 @@ type HeapModelInfos struct { ...@@ -89,6 +108,10 @@ type HeapModelInfos struct {
totalHot *big.Int totalHot *big.Int
} }
var (
_ ModelLibrary = (*HeapModelInfos)(nil)
)
func NewHeapModelInfos(models []ModelDetailInfo) *HeapModelInfos { func NewHeapModelInfos(models []ModelDetailInfo) *HeapModelInfos {
sort.Sort(SortedModelDetailInfos(models)) sort.Sort(SortedModelDetailInfos(models))
hm := &HeapModelInfos{ hm := &HeapModelInfos{
...@@ -103,7 +126,11 @@ func NewHeapModelInfos(models []ModelDetailInfo) *HeapModelInfos { ...@@ -103,7 +126,11 @@ func NewHeapModelInfos(models []ModelDetailInfo) *HeapModelInfos {
return hm return hm
} }
func (h *HeapModelInfos) SetModels(models []ModelDetailInfo) { func (h *HeapModelInfos) UpdateModelInfo(models []ModelDetailInfo) {
h.setModels(models)
}
func (h *HeapModelInfos) setModels(models []ModelDetailInfo) {
h.mux.Lock() h.mux.Lock()
defer h.mux.Unlock() defer h.mux.Unlock()
sort.Sort(SortedModelDetailInfos(models)) sort.Sort(SortedModelDetailInfos(models))
...@@ -131,3 +158,36 @@ func (h *HeapModelInfos) GetModelUsedLevel(modelID int) ModelUsedLevel { ...@@ -131,3 +158,36 @@ func (h *HeapModelInfos) GetModelUsedLevel(modelID int) ModelUsedLevel {
return ModelUsedLevelSuperLow return ModelUsedLevelSuperLow
} }
func (h *HeapModelInfos) FindModel(i int) ModelDetailInfo {
h.mux.Lock()
defer h.mux.Unlock()
if model, ok := h.modelMap[i]; ok {
return model
}
return ModelDetailInfo{}
}
func (h *HeapModelInfos) FindModelByName(s string) ModelDetailInfo {
h.mux.Lock()
defer h.mux.Unlock()
for _, model := range h.models {
if model.ImageName == s {
return model
}
}
return ModelDetailInfo{}
}
func (h *HeapModelInfos) InstalledWorkerCount(i int) int {
// todo: query the count from mongo.
return 0
}
func (h HeapModelInfos) AllModel() SortedModelDetailInfos {
h.mux.Lock()
defer h.mux.Unlock()
m := make(SortedModelDetailInfos, len(h.models))
copy(m, h.models)
return m
}
...@@ -23,7 +23,6 @@ func NewImageWorker(modellib ModelLibrary, manager WorkerManager) *imageWorker { ...@@ -23,7 +23,6 @@ func NewImageWorker(modellib ModelLibrary, manager WorkerManager) *imageWorker {
manager: manager, manager: manager,
quit: make(chan struct{}), quit: make(chan struct{}),
} }
} }
func (w *imageWorker) Exit() { func (w *imageWorker) Exit() {
...@@ -83,22 +82,22 @@ func (w *imageWorker) CanInstall(model ModelDetailInfo) bool { ...@@ -83,22 +82,22 @@ func (w *imageWorker) CanInstall(model ModelDetailInfo) bool {
for _, v := range info.Models.WaitToInstallModels { for _, v := range info.Models.WaitToInstallModels {
id, _ := strconv.Atoi(v.ModelId) id, _ := strconv.Atoi(v.ModelId)
minfo := w.modelLibrary.FindModel(id) minfo := w.modelLibrary.FindModel(id)
pending += minfo.HardwareRequire.DiskSize pending += minfo.HardwareRequire.IntDiskSize()
} }
diskPass := (diskFree - pending) >= model.HardwareRequire.DiskSize diskPass := (diskFree - pending) >= model.HardwareRequire.IntDiskSize()
gpuPass := false gpuPass := false
{ {
gpuList := w.info.Hardware.GPU gpuList := w.info.Hardware.GPU
require := model.HardwareRequire require := model.HardwareRequire
minRequire := require.Gpus[0] minGpuRequire := require.IntGpu(0)
for _, v := range require.Gpus { for i, _ := range require.Gpus {
if v.Gpu < minRequire.Gpu { if require.IntGpu(i) < minGpuRequire {
minRequire = v minGpuRequire = require.IntGpu(i)
} }
} }
for _, v := range gpuList { for _, v := range gpuList {
if v.Performance >= int32(minRequire.Gpu) && v.MemTotal >= require.MemorySize { if v.Performance >= int32(minGpuRequire) && v.MemTotal >= require.IntMemorySize() {
gpuPass = true gpuPass = true
break break
} }
...@@ -113,16 +112,16 @@ func (w *imageWorker) CanRun(model ModelDetailInfo) bool { ...@@ -113,16 +112,16 @@ func (w *imageWorker) CanRun(model ModelDetailInfo) bool {
// check gpu memory and performance // check gpu memory and performance
gpuList := info.Hardware.GPU gpuList := info.Hardware.GPU
require := model.HardwareRequire require := model.HardwareRequire
minRequire := require.Gpus[0] minGpuRequire := require.IntGpu(0)
for _, v := range require.Gpus { for i, _ := range require.Gpus {
if v.Gpu < minRequire.Gpu { if require.IntGpu(i) < minGpuRequire {
minRequire = v minGpuRequire = require.IntGpu(i)
} }
} }
gpuPass := false gpuPass := false
for _, v := range gpuList { for _, v := range gpuList {
if v.Performance >= int32(minRequire.Gpu) && v.MemFree >= require.MemorySize { if v.Performance >= int32(minGpuRequire) && v.MemFree >= require.IntMemorySize() {
gpuPass = true gpuPass = true
break break
} }
...@@ -136,16 +135,16 @@ func (w *imageWorker) CanForceRun(model ModelDetailInfo) bool { ...@@ -136,16 +135,16 @@ func (w *imageWorker) CanForceRun(model ModelDetailInfo) bool {
// check gpu memory and performance // check gpu memory and performance
gpuList := info.Hardware.GPU gpuList := info.Hardware.GPU
require := model.HardwareRequire require := model.HardwareRequire
minRequire := require.Gpus[0] minGpuRequire := require.IntGpu(0)
for _, v := range require.Gpus { for i, _ := range require.Gpus {
if v.Gpu < minRequire.Gpu { if require.IntGpu(i) < minGpuRequire {
minRequire = v minGpuRequire = require.IntGpu(i)
} }
} }
gpuPass := false gpuPass := false
for _, v := range gpuList { for _, v := range gpuList {
if v.Performance >= int32(minRequire.Gpu) && v.MemTotal >= require.MemorySize { if v.Performance >= int32(minGpuRequire) && v.MemTotal >= require.IntMemorySize() {
gpuPass = true gpuPass = true
break break
} }
...@@ -162,9 +161,8 @@ func (w *imageWorker) DistributeImages() { ...@@ -162,9 +161,8 @@ func (w *imageWorker) DistributeImages() {
case <-w.quit: case <-w.quit:
return return
case <-tc.C: case <-tc.C:
w.distribute()
// tc.Reset(time.Minute * 20)
tc.Reset(time.Minute * 30)
} }
} }
...@@ -199,7 +197,7 @@ func (w *imageWorker) distribute() { ...@@ -199,7 +197,7 @@ func (w *imageWorker) distribute() {
} }
if mode == HashingMode { if mode == HashingMode {
hash := sha3.Sum256([]byte(info.Info.MinerPubkey)) hash := sha3.Sum256([]byte(info.Info.MinerPubkey))
level := lib.GetModelUsedLevel(model) level := lib.GetModelUsedLevel(model.TaskID)
weights := 0 weights := 0
switch level { switch level {
case ModelUsedLevelSuperLow: case ModelUsedLevelSuperLow:
......
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"github.com/odysseus/cache/cachedata" "github.com/odysseus/cache/cachedata"
"github.com/odysseus/mogo/operator" "github.com/odysseus/mogo/operator"
"github.com/odysseus/nodemanager/config" "github.com/odysseus/nodemanager/config"
"github.com/odysseus/nodemanager/distribute"
"github.com/odysseus/nodemanager/standardlib" "github.com/odysseus/nodemanager/standardlib"
"github.com/odysseus/nodemanager/utils" "github.com/odysseus/nodemanager/utils"
odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1" odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
...@@ -72,10 +73,12 @@ type WorkerManager struct { ...@@ -72,10 +73,12 @@ type WorkerManager struct {
workerInfoOperator *operator.WorkerInfoOperator workerInfoOperator *operator.WorkerInfoOperator
workerInstalledOperator *operator.WorkerInstalledOperator workerInstalledOperator *operator.WorkerInstalledOperator
workerRunningOperator *operator.WorkerRunningOperator workerRunningOperator *operator.WorkerRunningOperator
distributor *distribute.Distributor
} }
func NewWorkerManager(rdb *redis.Client, node NodeInterface) *WorkerManager { func NewWorkerManager(rdb *redis.Client, node NodeInterface) *WorkerManager {
return &WorkerManager{ wm := &WorkerManager{
heartBeat: make(map[int64]int64), heartBeat: make(map[int64]int64),
workerReg: make(map[int64]*registry.Registry), workerReg: make(map[int64]*registry.Registry),
workers: make(map[int64]*Worker), workers: make(map[int64]*Worker),
...@@ -88,6 +91,8 @@ func NewWorkerManager(rdb *redis.Client, node NodeInterface) *WorkerManager { ...@@ -88,6 +91,8 @@ func NewWorkerManager(rdb *redis.Client, node NodeInterface) *WorkerManager {
workerInstalledOperator: operator.NewDBWorkerInstalled(node.Mongo(), config.GetConfig().Mongo.Database), workerInstalledOperator: operator.NewDBWorkerInstalled(node.Mongo(), config.GetConfig().Mongo.Database),
workerRunningOperator: operator.NewDBWorkerRunning(node.Mongo(), config.GetConfig().Mongo.Database), workerRunningOperator: operator.NewDBWorkerRunning(node.Mongo(), config.GetConfig().Mongo.Database),
} }
wm.distributor = distribute.StartDistributor(wm)
return wm
} }
func (wm *WorkerManager) Stop() { func (wm *WorkerManager) Stop() {
...@@ -202,6 +207,11 @@ func (wm *WorkerManager) doCallback(hook string, response *odysseus.TaskResponse ...@@ -202,6 +207,11 @@ func (wm *WorkerManager) doCallback(hook string, response *odysseus.TaskResponse
} }
} }
func (wm *WorkerManager) WorkerCount() int {
// todo: read count from mongo.
return len(wm.workers)
}
func (wm *WorkerManager) disconnect(worker *Worker) { func (wm *WorkerManager) disconnect(worker *Worker) {
worker.online = false worker.online = false
worker.status = "disconnected" worker.status = "disconnected"
...@@ -408,6 +418,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -408,6 +418,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
defer l.WithField("worker-addr", worker.workerAddr).Info("exit handle worker message") defer l.WithField("worker-addr", worker.workerAddr).Info("exit handle worker message")
defer close(worker.quit) defer close(worker.quit)
defer worker.Disconnect() defer worker.Disconnect()
defer wm.distributor.RemoveWorker(worker.uuid)
checkDuration := config.GetConfig().Tickers.HeartBeat * 3 checkDuration := config.GetConfig().Tickers.HeartBeat * 3
...@@ -594,6 +605,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -594,6 +605,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
if e := wm.setWorkerLastTaskTime(worker, time.Now().Unix()); e != nil { if e := wm.setWorkerLastTaskTime(worker, time.Now().Unix()); e != nil {
log.WithField("worker", worker.uuid).WithError(e).Error("set worker last task time failed") log.WithField("worker", worker.uuid).WithError(e).Error("set worker last task time failed")
} }
wm.distributor.AddWorker(worker.uuid, worker.info)
default: default:
l.WithField("worker-addr", worker.workerAddr).Error(fmt.Sprintf("unsupport msg type %T", msg)) l.WithField("worker-addr", worker.workerAddr).Error(fmt.Sprintf("unsupport msg type %T", msg))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment