Commit 3b9c91f2 authored by vicotor's avatar vicotor

add simple distribute

parent 14a73b0d
package distribute
type Distribute struct {
workerList []IWorker
existing map[string]IWorker
modelList map[string]ModelDetailInfo
}
func NewDistribute() *Distribute {
return &Distribute{
workerList: make([]IWorker, 1000),
existing: make(map[string]IWorker),
}
}
func (d *Distribute) AttachWorker(w IWorker) bool {
if _, ok := d.existing[w.Address()]; ok {
return false
}
d.existing[w.Address()] = w
return true
}
func (d *Distribute) DetachWorker(w IWorker) bool {
if _, ok := d.existing[w.Address()]; !ok {
return false
}
delete(d.existing, w.Address())
return true
}
func (d *Distribute) schedulerAWorker() {
}
package distribute
type GpuInfo struct {
Performance int64
Mem int64
}
type IWorker interface {
DiskReverse() int64
Address() string
GPUInfo() []GpuInfo
}
type WorkerManager interface {
WorkerCount() int
}
type ModelLibrary interface {
FindModel(int) ModelDetailInfo
FindModelByName(string) ModelDetailInfo
InstalledWorkerCount(int) int
AllModel() SortedModelDetailInfos
GetModelUsedLevel(info ModelDetailInfo) ModelUsedLevel
}
package distribute
import (
"encoding/json"
"math/big"
"sort"
"sync"
"time"
)
type ModelUsedLevel int
const (
ModelUsedLevelSuperLow ModelUsedLevel = iota // < 2%
ModelUsedLevelVeryLow // 2% ~ 5%
ModelUsedLevelLow // 5% ~ 10%
ModelUsedLevelMiddle // 10% ~ 30%
ModelUsedLevelHigh // 30% ~ 50%
ModelUsedLevelVeryHigh // 50% ~ 80%
ModelUsedLevelSuperHigh // >= 80%
)
func getModelLevel(count int, total int) ModelUsedLevel {
usedRate := float64(count) / float64(total)
if usedRate < 0.02 {
return ModelUsedLevelSuperLow
} else if usedRate < 0.05 {
return ModelUsedLevelVeryLow
} else if usedRate < 0.1 {
return ModelUsedLevelLow
} else if usedRate < 0.3 {
return ModelUsedLevelMiddle
} else if usedRate < 0.5 {
return ModelUsedLevelHigh
} else if usedRate < 0.8 {
return ModelUsedLevelVeryHigh
} else {
return ModelUsedLevelSuperHigh
}
}
type HardwareRequireInfo struct {
DiskSize int64 `json:"disk_size"`
Gpus []struct {
Gpu int64 `json:"gpu"`
}
MemorySize int64 `json:"memory_size"`
}
type ModelDetailInfo struct {
Time time.Time `json:"time"`
Count int `json:"count"`
HardwareRequire HardwareRequireInfo `json:"hardware_require"`
ImageName string `json:"image_name"`
SignURL string `json:"sign_url"`
TaskID int `json:"task_id"`
Kind int `json:"kind"`
FileExpiresTime string `json:"file_expires_time"`
AccessStatus int `json:"access_status"`
PublishStatus int `json:"publish_status"`
EstimateExeTime int `json:"estimat_exe_time"`
StartUpTime int `json:"start_up_time"`
RunningMem int `json:"running_mem"`
Cmd json.RawMessage `json:"cmd"`
}
// implement the sort interface
func (s SortedModelDetailInfos) Len() int {
return len(s)
}
func (s SortedModelDetailInfos) Less(i, j int) bool {
if s[i].Count == s[j].Count {
return s[i].TaskID < s[j].TaskID
}
return s[i].Count < s[j].Count
}
func (s SortedModelDetailInfos) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
type SortedModelDetailInfos []ModelDetailInfo
type HeapModelInfos struct {
mux sync.Mutex
models SortedModelDetailInfos `json:"models"`
modelMap map[int]ModelDetailInfo
totalHot *big.Int
}
func NewHeapModelInfos(models []ModelDetailInfo) *HeapModelInfos {
sort.Sort(SortedModelDetailInfos(models))
hm := &HeapModelInfos{
models: models,
totalHot: big.NewInt(0),
modelMap: make(map[int]ModelDetailInfo),
}
for _, model := range models {
hm.modelMap[model.TaskID] = model
hm.totalHot.Add(hm.totalHot, big.NewInt(int64(model.Count)))
}
return hm
}
func (h *HeapModelInfos) SetModels(models []ModelDetailInfo) {
h.mux.Lock()
defer h.mux.Unlock()
sort.Sort(SortedModelDetailInfos(models))
h.models = models
h.totalHot = big.NewInt(0)
h.modelMap = make(map[int]ModelDetailInfo)
for _, model := range models {
h.modelMap[model.TaskID] = model
h.totalHot.Add(h.totalHot, big.NewInt(int64(model.Count)))
}
}
func (h *HeapModelInfos) GetSortedModels() []ModelDetailInfo {
h.mux.Lock()
defer h.mux.Unlock()
return h.models
}
func (h *HeapModelInfos) GetModelUsedLevel(modelID int) ModelUsedLevel {
h.mux.Lock()
defer h.mux.Unlock()
if model, ok := h.modelMap[modelID]; ok {
return getModelLevel(model.Count, int(h.totalHot.Int64()))
}
return ModelUsedLevelSuperLow
}
package distribute
import (
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v2"
"golang.org/x/crypto/sha3"
"sort"
"strconv"
"sync"
"time"
)
type imageWorker struct {
modelLibrary ModelLibrary
manager WorkerManager
mux sync.Mutex
info *omanager.NodeInfoResponse
quit chan struct{}
}
func NewImageWorker(modellib ModelLibrary, manager WorkerManager) *imageWorker {
return &imageWorker{
modelLibrary: modellib,
manager: manager,
quit: make(chan struct{}),
}
}
func (w *imageWorker) Exit() {
defer func() {
recover()
}()
close(w.quit)
}
func (w *imageWorker) UpdateInfo(info *omanager.NodeInfoResponse) {
w.mux.Lock()
defer w.mux.Unlock()
w.info = info
}
func (w *imageWorker) getInfo() omanager.NodeInfoResponse {
w.mux.Lock()
defer w.mux.Unlock()
return *w.info
}
// Seed return the seed of the worker.
func (w *imageWorker) Seed() []byte {
info := w.getInfo()
return []byte(info.Info.MinerPubkey)
}
// IsInstalled check if the model is installed on worker.
func (w *imageWorker) IsInstalled(modelId int) bool {
info := w.getInfo()
for _, v := range info.Models.InstalledModels {
id, _ := strconv.Atoi(v.ModelId)
if id == modelId {
return true
}
}
return false
}
// IsRunning check if the model is running on the worker.
func (w *imageWorker) IsRunning(modelId int) bool {
info := w.getInfo()
for _, v := range info.Models.RunningModels {
id, _ := strconv.Atoi(v.ModelId)
if id == modelId {
return true
}
}
return false
}
// CanInstall check if the worker can install the model.
func (w *imageWorker) CanInstall(model ModelDetailInfo) bool {
info := w.getInfo()
diskFree := info.Hardware.DISK.Free
pending := int64(0)
for _, v := range info.Models.WaitToInstallModels {
id, _ := strconv.Atoi(v.ModelId)
minfo := w.modelLibrary.FindModel(id)
pending += minfo.HardwareRequire.DiskSize
}
diskPass := (diskFree - pending) >= model.HardwareRequire.DiskSize
gpuPass := false
{
gpuList := w.info.Hardware.GPU
require := model.HardwareRequire
minRequire := require.Gpus[0]
for _, v := range require.Gpus {
if v.Gpu < minRequire.Gpu {
minRequire = v
}
}
for _, v := range gpuList {
if v.Performance >= int32(minRequire.Gpu) && v.MemTotal >= require.MemorySize {
gpuPass = true
break
}
}
}
return diskPass && gpuPass
}
// CanRun check if the worker can run the model with no stop other models.
func (w *imageWorker) CanRun(model ModelDetailInfo) bool {
info := w.getInfo()
// check gpu memory and performance
gpuList := info.Hardware.GPU
require := model.HardwareRequire
minRequire := require.Gpus[0]
for _, v := range require.Gpus {
if v.Gpu < minRequire.Gpu {
minRequire = v
}
}
gpuPass := false
for _, v := range gpuList {
if v.Performance >= int32(minRequire.Gpu) && v.MemFree >= require.MemorySize {
gpuPass = true
break
}
}
return gpuPass
}
// CanForceRun check if the worker can run the model but need stop other models.
func (w *imageWorker) CanForceRun(model ModelDetailInfo) bool {
info := w.getInfo()
// check gpu memory and performance
gpuList := info.Hardware.GPU
require := model.HardwareRequire
minRequire := require.Gpus[0]
for _, v := range require.Gpus {
if v.Gpu < minRequire.Gpu {
minRequire = v
}
}
gpuPass := false
for _, v := range gpuList {
if v.Performance >= int32(minRequire.Gpu) && v.MemTotal >= require.MemorySize {
gpuPass = true
break
}
}
return gpuPass
}
func (w *imageWorker) DistributeImages() {
tc := time.NewTicker(time.Second)
defer tc.Stop()
for {
select {
case <-w.quit:
return
case <-tc.C:
//
tc.Reset(time.Minute * 30)
}
}
}
type DistributeMode int
const (
GreedyMode DistributeMode = iota // 从热度由高到低,尽可能安装更多的模型
HashingMode // 从热度由高到低,选择相匹配的模型进行安装
)
func (w *imageWorker) distribute() {
models := w.modelLibrary.AllModel()
sort.Sort(models)
info := w.getInfo()
lib := w.modelLibrary
totalWorker := w.manager.WorkerCount()
mode := GreedyMode
// 贪婪模式
if totalWorker > 10 {
mode = HashingMode
}
// 散列模式
for _, model := range models {
if mode == GreedyMode {
if w.CanInstall(model) {
// todo: quest worker to install the model.
}
}
if mode == HashingMode {
hash := sha3.Sum256([]byte(info.Info.MinerPubkey))
level := lib.GetModelUsedLevel(model)
weights := 0
switch level {
case ModelUsedLevelSuperLow:
weights += 2
case ModelUsedLevelVeryLow:
weights += 5
case ModelUsedLevelLow:
weights += 10
case ModelUsedLevelMiddle:
weights += 30
case ModelUsedLevelHigh:
weights += 50
case ModelUsedLevelSuperHigh:
weights += 80
}
if int(hash[0]) < (255 * weights / 100) {
if w.CanInstall(model) {
// todo: quest worker to install the model.
}
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment