Commit 5c2b377e authored by luxq's avatar luxq

update worker info and test

parent 53d9845c
......@@ -95,28 +95,12 @@ func (d *dbWorker) UpdateNodeInfo(ctx context.Context, id string, nodeInfo *type
return err
}
func (d *dbWorker) FindWorkerByInstalledModelId(ctx context.Context, modelId string) ([]*DbWorkerInfo, error) {
// find all worker that at least one installed model's mode_id is equal modelId
selector := bson.M{"model_infos.installed_models.model_id": modelId}
cursor, err := d.col.Find(ctx, selector)
if err != nil {
return nil, err
}
defer cursor.Close(ctx)
var workers []*DbWorkerInfo
if err = cursor.All(ctx, &workers); err != nil {
return nil, err
}
return workers, nil
}
func (d *dbWorker) FindWorkerByRunningModelIdWithLimit(ctx context.Context, modelId string, limit int64) ([]*DbWorkerInfo, error) {
func (d *dbWorker) FindWorkerByRunningModelAndSortByWaitTime(ctx context.Context, modelId string, limit int) ([]*DbWorkerInfo, error) {
// find all worker that at least one running model's mode_id is equal modelId
// sort by wait time
findOptions := options.Find()
findOptions.SetLimit(limit)
findOptions.SetSort(bson.D{{"hardware.gpu.usage", 1}})
findOptions.SetLimit(int64(limit))
findOptions.SetSort(bson.D{{"model_infos.running_models.wait_time", 1}})
selector := bson.M{"model_infos.running_models.model_id": modelId}
cursor, err := d.col.Find(ctx, selector, findOptions)
......@@ -132,27 +116,15 @@ func (d *dbWorker) FindWorkerByRunningModelIdWithLimit(ctx context.Context, mode
return workers, nil
}
func (d *dbWorker) FindWorkerByRunningModelId(ctx context.Context, modelId string) ([]*DbWorkerInfo, error) {
// find all worker that at least one running model's mode_id is equal modelId
selector := bson.M{"model_infos.running_models.model_id": modelId}
cursor, err := d.col.Find(ctx, selector)
if err != nil {
return nil, err
}
defer cursor.Close(ctx)
var workers []*DbWorkerInfo
if err = cursor.All(ctx, &workers); err != nil {
return nil, err
}
return workers, nil
}
func (d *dbWorker) FindWorkerByGpuModel(ctx context.Context, model string) ([]*DbWorkerInfo, error) {
// find all worker that at least one gpu model is equal model.
selector := bson.M{"hardware.gpu.model": model}
func (d *dbWorker) FindWorkerByInstallModelAndSortByGpuRam(ctx context.Context, modelId string, performance int, ram int, limit int) ([]*DbWorkerInfo, error) {
// find all worker that at least one installed model's mode_id is equal modelId
// sort by gpu ram
findOptions := options.Find()
findOptions.SetLimit(int64(limit))
findOptions.SetSort(bson.D{{"hardware.gpu.ram", 1}})
cursor, err := d.col.Find(ctx, selector)
selector := bson.M{"model_infos.installed_models.model_id": modelId, "hardware.gpu.performance": bson.M{"$gte": performance}, "hardware.gpu.ram": bson.M{"$gte": ram}}
cursor, err := d.col.Find(ctx, selector, findOptions)
if err != nil {
return nil, err
}
......@@ -163,21 +135,5 @@ func (d *dbWorker) FindWorkerByGpuModel(ctx context.Context, model string) ([]*D
return nil, err
}
return workers, nil
}
func (d *dbWorker) FindWorkerByGpuRam(ctx context.Context, ram uint) ([]*DbWorkerInfo, error) {
// find all worker that at least one gpu ram is greater or equal than ram.
selector := bson.M{"hardware.gpu.ram": bson.M{"$gte": ram}}
cursor, err := d.col.Find(ctx, selector)
if err != nil {
return nil, err
}
defer cursor.Close(ctx)
var workers []*DbWorkerInfo
if err = cursor.All(ctx, &workers); err != nil {
return nil, err
}
return workers, nil
}
......@@ -63,7 +63,7 @@ func generateAWroker() *DbWorkerInfo {
return &DbWorkerInfo{
WorkerId: uuid.NewString(),
NodeInfo: generateANodeInfo(),
Models: generateAmodel(),
Models: generateAModel(),
Hardware: generateAHardware(),
}
}
......@@ -85,28 +85,38 @@ func generateANodeInfo() *types.NodeInfo {
func generateACpu() *types.CpuInfo {
return &types.CpuInfo{
Model: "Intel i9",
Core: 8,
Usage: uint(rand.Intn(50) + 30),
Model: "Intel(R) Core(TM) i7-8700 CPU @ 3.20GHz",
Number: 1,
Cores: 8,
Threads: 16,
Usage: int(rand.Intn(30) + 40),
}
}
func generateADisk() *types.DiskInfo {
return &types.DiskInfo{
Size: 1024,
Usage: uint(rand.Intn(100) + 100),
Total: 1024 * 1024 * 1024,
Free: 100 * 1024 * 1024,
}
}
func generateANet() *types.NetInfo {
return &types.NetInfo{
IP: fmt.Sprintf("192.168.1.%d", rand.Intn(255)),
Mac: fmt.Sprintf("%02x:%02x:%02x:%02x:%02x:%02x", 1, 2, 3, 4, 5, 6),
Bandwidth: rand.Intn(30) + 60,
}
}
func generateARam() *types.RamInfo {
return &types.RamInfo{
Size: 8 * (1 << 30),
Usage: uint(rand.Intn(30) + 40),
Total: 32 * 1024 * 1024 * 1024,
Free: rand.Intn(10) * 1024 * 1024 * 1024,
}
}
func generateAGpuRam() uint {
return uint(rand.Intn(3)*8 + 8) // 8, 16, 24
func generateAGpuRam() int {
return 1024 * 1024 * 1024 * (rand.Intn(3)*8 + 8) // 8, 16, 24
}
func generateAGpuModel() string {
......@@ -114,61 +124,88 @@ func generateAGpuModel() string {
return fmt.Sprintf("Nvidia %d", m)
}
func generateAIdleGpu() *types.GpuInfo {
func generateAGpuPerformance() int {
return rand.Intn(100) + 500
}
func generateAIdleGpu(seq int) *types.GpuInfo {
ram := generateAGpuRam()
return &types.GpuInfo{
Seq: seq,
UUID: uuid.NewString(),
Model: generateAGpuModel(),
Ram: generateAGpuRam(),
Performance: generateAGpuPerformance(),
PowerRating: 100,
MemTotal: ram,
MemFree: ram,
Usage: 0,
Temp: 40,
PowerRt: 30,
}
}
func generateAUsageGpu() *types.GpuInfo {
func generateAUsageGpu(seq int) *types.GpuInfo {
ram := generateAGpuRam()
return &types.GpuInfo{
Seq: seq,
UUID: uuid.NewString(),
Model: generateAGpuModel(),
Ram: ram,
Usage: uint(rand.Intn(10) + 30),
Occupy: ram * 10 / 7,
Temp: 60,
Performance: generateAGpuPerformance(),
PowerRating: 100,
MemTotal: ram,
MemFree: ram * (rand.Intn(3) + 3) / 10,
Usage: rand.Intn(20) + 70,
Temp: 40,
PowerRt: 30,
}
}
func generateAHardware() *types.HardwareInfo {
return &types.HardwareInfo{
Cpu: []*types.CpuInfo{
generateACpu(),
CPU: generateACpu(),
GPU: []*types.GpuInfo{
generateAIdleGpu(0),
generateAIdleGpu(1),
generateAUsageGpu(2),
},
Gpu: []*types.GpuInfo{
generateAIdleGpu(),
generateAUsageGpu(),
},
Ram: generateARam(),
Disk: generateADisk(),
RAM: generateARam(),
DISK: generateADisk(),
NET: generateANet(),
}
}
func generateAmodel() *types.ModelInfo {
return &types.ModelInfo{
InstalledModels: []*types.InstalledModelInfo{
&types.InstalledModelInfo{
ModelId: getRandId(100),
func generateAInstallModel() *types.InstalledModel {
return &types.InstalledModel{
ModelID: getRandId(100),
DiskSize: 101,
InstalledTime: time.Now().Unix(),
LastRunTime: time.Now().Unix(),
}
}
func generateARunningModel() *types.RunningModel {
return &types.RunningModel{
ModelID: getRandId(100),
GpuSeq: rand.Intn(3),
GpuRAM: generateAGpuRam(),
StartedTime: time.Now().Unix(),
LastWorkTime: time.Now().Unix(),
TotalRunCount: rand.Intn(100),
WaitTime: rand.Intn(100),
}
}
func generateAModel() *types.ModelInfo {
return &types.ModelInfo{
InstalledModels: []*types.InstalledModel{
generateAInstallModel(),
generateAInstallModel(),
generateAInstallModel(),
},
&types.InstalledModelInfo{
ModelId: getRandId(100),
DiskSize: 44,
},
},
RunningModels: []*types.RunningModelInfo{
&types.RunningModelInfo{
ModelId: getRandId(100),
StartedTime: uint64((time.Now().Add(-time.Hour)).Unix()),
LatestTime: uint64(time.Now().Add(-time.Minute * 20).Unix()),
RunCount: 101,
GpuRamUsed: uint64(rand.Intn(20) + 10),
},
RunningModels: []*types.RunningModel{
generateARunningModel(),
},
}
}
func BenchmarkGenerateWorker(b *testing.B) {
......@@ -264,7 +301,7 @@ func BenchmarkDbWorker_UpdateModel(b *testing.B) {
defer db.client.Disconnect(context.Background())
for i := 0; i < b.N; i++ {
idx := rand.Intn(len(idlist))
nresource := generateAmodel()
nresource := generateAModel()
if err := db.UpdateModel(context.Background(), idlist[idx], nresource); err != nil {
panic(fmt.Sprintf("update worker failed with err:%s", err))
}
......@@ -282,7 +319,7 @@ func BenchmarkDbWorker_UpdateModel_Parallel(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
idx := rand.Intn(len(idlist))
nresource := generateAmodel()
nresource := generateAModel()
if err := db.UpdateModel(context.Background(), idlist[idx], nresource); err != nil {
panic(fmt.Sprintf("update worker failed with err:%s", err))
}
......@@ -326,81 +363,7 @@ func BenchmarkDbWorker_UpdateNodeInfo_Parallel(b *testing.B) {
})
}
func BenchmarkDbWorker_FindWorkerByRunningModelId(b *testing.B) {
client, err := ConnectMongoDB("mongodb://localhost:27017")
if err != nil {
log.Fatal(err)
}
db := NewDBWorker(client, database, collection)
defer db.client.Disconnect(context.Background())
b.ResetTimer()
for i := 0; i < b.N; i++ {
runningModelId := getRandId(100)
if w, err := db.FindWorkerByRunningModelId(context.Background(), runningModelId); err != nil {
panic(fmt.Sprintf("find worker failed with err:%s", err))
} else if len(w) == 0 {
b.Logf("FindWorkerByRunningModelId find %d with id %s\n", len(w), runningModelId)
}
}
}
func BenchmarkDbWorker_FindWorkerByRunningModelId_Parallel(b *testing.B) {
client, err := ConnectMongoDB("mongodb://localhost:27017")
if err != nil {
log.Fatal(err)
}
db := NewDBWorker(client, database, collection)
defer db.client.Disconnect(context.Background())
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
runningModelId := getRandId(100)
if w, err := db.FindWorkerByRunningModelId(context.Background(), runningModelId); err != nil {
panic(fmt.Sprintf("find worker failed with err:%s", err))
} else if len(w) == 0 {
b.Logf("FindWorkerByRunningModelId find %d with id %s\n", len(w), runningModelId)
}
}
})
}
func BenchmarkDbWorker_FindWorkerByRunningModelIdWithLimit(b *testing.B) {
client, err := ConnectMongoDB("mongodb://localhost:27017")
if err != nil {
log.Fatal(err)
}
db := NewDBWorker(client, database, collection)
defer db.client.Disconnect(context.Background())
b.ResetTimer()
for i := 0; i < b.N; i++ {
runningModelId := getRandId(100)
if w, err := db.FindWorkerByRunningModelIdWithLimit(context.Background(), runningModelId, 10); err != nil {
panic(fmt.Sprintf("find worker failed with err:%s", err))
} else if len(w) == 0 {
b.Logf("FindWorkerByRunningModelId find %d with id %s\n", len(w), runningModelId)
}
}
}
func BenchmarkDbWorker_FindWorkerByRunningModelIdWithLimit_Parallel(b *testing.B) {
client, err := ConnectMongoDB("mongodb://localhost:27017")
if err != nil {
log.Fatal(err)
}
db := NewDBWorker(client, database, collection)
defer db.client.Disconnect(context.Background())
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
runningModelId := getRandId(100)
if w, err := db.FindWorkerByRunningModelIdWithLimit(context.Background(), runningModelId, 10); err != nil {
panic(fmt.Sprintf("find worker failed with err:%s", err))
} else if len(w) == 0 {
b.Logf("FindWorkerByRunningModelId find %d with id %s\n", len(w), runningModelId)
}
}
})
}
func BenchmarkDbWorker_FindWorkerByInstalledModelId(b *testing.B) {
func BenchmarkDbWorker_FindWorkerByInstallModelAndSortByGpuRam(b *testing.B) {
client, err := ConnectMongoDB("mongodb://localhost:27017")
if err != nil {
log.Fatal(err)
......@@ -410,15 +373,17 @@ func BenchmarkDbWorker_FindWorkerByInstalledModelId(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
installedModelId := getRandId(100)
if w, err := db.FindWorkerByInstalledModelId(context.Background(), installedModelId); err != nil {
performance := generateAGpuPerformance()
ram := generateAGpuRam()
if w, err := db.FindWorkerByInstallModelAndSortByGpuRam(context.Background(), installedModelId, performance, ram, 10); err != nil {
panic(fmt.Sprintf("find worker failed with err:%s", err))
} else if len(w) == 0 {
b.Logf("FindWorkerByInstalledModelId find %d with id %s\n", len(w), installedModelId)
b.Logf("FindWorkerByInstallModelAndSortByGpuRam find %d with id %s\n", len(w), installedModelId)
}
}
}
func BenchmarkDbWorker_FindWorkerByInstalledModelId_Parallel(b *testing.B) {
func BenchmarkDbWorker_FindWorkerByInstallModelAndSortByGpuRam_Parallel(b *testing.B) {
client, err := ConnectMongoDB("mongodb://localhost:27017")
if err != nil {
log.Fatal(err)
......@@ -428,16 +393,18 @@ func BenchmarkDbWorker_FindWorkerByInstalledModelId_Parallel(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
installedModelId := getRandId(100)
if w, err := db.FindWorkerByInstalledModelId(context.Background(), installedModelId); err != nil {
performance := generateAGpuPerformance()
ram := generateAGpuRam()
if w, err := db.FindWorkerByInstallModelAndSortByGpuRam(context.Background(), installedModelId, performance, ram, 10); err != nil {
panic(fmt.Sprintf("find worker failed with err:%s", err))
} else if len(w) == 0 {
b.Logf("FindWorkerByInstalledModelId find %d with id %s\n", len(w), installedModelId)
b.Logf("FindWorkerByInstallModelAndSortByGpuRam find %d with id %s\n", len(w), installedModelId)
}
}
})
}
func BenchmarkDbWorker_FindWorkerByGpuRam(b *testing.B) {
func BenchmarkDbWorker_FindWorkerByRunningModelAndSortByWaitTime(b *testing.B) {
client, err := ConnectMongoDB("mongodb://localhost:27017")
if err != nil {
log.Fatal(err)
......@@ -446,16 +413,16 @@ func BenchmarkDbWorker_FindWorkerByGpuRam(b *testing.B) {
defer db.client.Disconnect(context.Background())
b.ResetTimer()
for i := 0; i < b.N; i++ {
ram := generateAGpuRam()
if w, err := db.FindWorkerByGpuRam(context.Background(), ram); err != nil {
runningModelId := getRandId(100)
if w, err := db.FindWorkerByRunningModelAndSortByWaitTime(context.Background(), runningModelId, 10); err != nil {
panic(fmt.Sprintf("find worker failed with err:%s", err))
} else if len(w) == 0 {
b.Logf("FindWorkerByGpuRam find %d with ram %d\n", len(w), ram)
b.Logf("FindWorkerByRunningModelAndSortByWaitTime find %d with id %s\n", len(w), runningModelId)
}
}
}
func BenchmarkDbWorker_FindWorkerByGpuRam_Parallel(b *testing.B) {
func BenchmarkDbWorker_FindWorkerByRunningModelAndSortByWaitTime_Parallel(b *testing.B) {
client, err := ConnectMongoDB("mongodb://localhost:27017")
if err != nil {
log.Fatal(err)
......@@ -464,11 +431,11 @@ func BenchmarkDbWorker_FindWorkerByGpuRam_Parallel(b *testing.B) {
defer db.client.Disconnect(context.Background())
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
ram := generateAGpuRam()
if w, err := db.FindWorkerByGpuRam(context.Background(), ram); err != nil {
runningModelId := getRandId(100)
if w, err := db.FindWorkerByRunningModelAndSortByWaitTime(context.Background(), runningModelId, 10); err != nil {
panic(fmt.Sprintf("find worker failed with err:%s", err))
} else if len(w) == 0 {
b.Logf("FindWorkerByGpuRam find %d with ram %d\n", len(w), ram)
b.Logf("FindWorkerByRunningModelAndSortByWaitTime find %d with id %s\n", len(w), runningModelId)
}
}
})
......
......@@ -6,65 +6,71 @@ type NodeInfo struct {
DeviceIp string `bson:"device_ip" json:"device_ip"`
}
type InstalledModelInfo struct {
ModelId string `bson:"model_id" json:"model_id"`
DiskSize uint64 `bson:"disk_size" json:"disk_size"`
type InstalledModel struct {
ModelID string `bson:"model_id" json:"model_id"`
DiskSize int `bson:"disk_size" json:"disk_size"`
InstalledTime int64 `bson:"installed_time" json:"installed_time"`
LastRunTime int64 `bson:"last_run_time" json:"last_run_time"`
}
type RunningModelInfo struct {
ModelId string `bson:"model_id" json:"model_id"`
StartedTime uint64 `bson:"started_time" json:"started_time"`
LatestTime uint64 `bson:"latest_time" json:"latest_time"`
RunCount uint64 `bson:"run_count" json:"run_count"`
GpuRamUsed uint64 `bson:"gpu_ram" json:"gpu_ram"`
type RunningModel struct {
ModelID string `bson:"model_id" json:"model_id"`
GpuSeq int `bson:"gpu_seq" json:"gpu_seq"`
GpuRAM int `bson:"gpu_ram" json:"gpu_ram"`
StartedTime int64 `bson:"started_time" json:"started_time"`
LastWorkTime int64 `bson:"last_work_time" json:"last_work_time"`
TotalRunCount int `bson:"total_run_count" json:"total_run_count"`
WaitTime int `bson:"wait_time" json:"wait_time"`
}
type ModelInfo struct {
InstalledModels []*InstalledModelInfo `bson:"installed_models" json:"installed_models"`
RunningModels []*RunningModelInfo `bson:"running_models" json:"running_models"`
InstalledModels []*InstalledModel `bson:"installed_models" json:"installed_models"`
RunningModels []*RunningModel `bson:"running_models" json:"running_models"`
}
type DeviceInfo struct {
DeviceType string `bson:"device_type" json:"device_type"`
DeviceModel string `bson:"device_model" json:"device_model"`
DeviceParam string `bson:"device_param" json:"device_param"`
DevicePower uint64 `bson:"device_power" json:"device_power"`
}
type DeviceUsage struct {
DeviceType string `bson:"device_type" json:"device_type"`
DeviceUsage uint64 `bson:"device_usage" json:"device_usage"`
type HardwareInfo struct {
CPU *CpuInfo `bson:"CPU" json:"CPU"`
GPU []*GpuInfo `bson:"GPU" json:"GPU"`
RAM *RamInfo `bson:"RAM" json:"RAM"`
DISK *DiskInfo `bson:"DISK" json:"DISK"`
NET *NetInfo `bson:"NET" json:"NET"`
}
type CpuInfo struct {
Model string `bson:"model" json:"model"`
Core uint `bson:"core" json:"core"`
Usage uint `bson:"usage" json:"usage"`
Model string `bson:"model" json:"model "`
Number int `bson:"number" json:"number"`
Cores int `bson:"cores" json:"cores"`
Threads int `bson:"threads" json:"threads"`
Usage int `bson:"usage" json:"usage"`
}
type GpuInfo struct {
Model string `bson:"model" json:"model"`
Ram uint `bson:"ram" json:"ram"`
Usage uint `bson:"usage" json:"usage"`
Occupy uint `bson:"occupy" json:"occupy"`
Temp uint `bson:"temp" json:"temp"`
Seq int `bson:"seq" json:"seq"`
UUID string `bson:"uuid" json:"uuid"`
Model string `bson:"model" json:"model "`
Performance int `bson:"performance" json:"performance"`
PowerRating int `bson:"power_rating" json:"power_rating"`
MemTotal int `bson:"mem_total" json:"mem_total"`
MemFree int `bson:"mem_free" json:"mem_free"`
Usage int `bson:"usage" json:"usage"`
Temp int `bson:"temp" json:"temp "`
PowerRt int `bson:"power_rt" json:"power_rt"`
}
type RamInfo struct {
Size uint `bson:"size" json:"size"`
Usage uint `bson:"usage" json:"usage"`
Total int `bson:"total" json:"total"`
Free int `bson:"free" json:"free"`
}
type DiskInfo struct {
Size uint `bson:"size" json:"size"`
Usage uint `bson:"usage" json:"usage"`
Total int `bson:"total" json:"total"`
Free int `bson:"free" json:"free"`
}
type HardwareInfo struct {
Cpu []*CpuInfo `bson:"cpu" json:"cpu"`
Gpu []*GpuInfo `bson:"gpu" json:"gpu"`
Ram *RamInfo `bson:"ram" json:"ram"`
Disk *DiskInfo `bson:"disk" json:"disk"`
type NetInfo struct {
IP string `bson:"ip" json:"ip"`
Mac string `bson:"mac" json:"mac"`
Bandwidth int `bson:"bandwidth" json:"bandwidth"`
}
type WorkerInfo struct {
......@@ -73,3 +79,8 @@ type WorkerInfo struct {
ModelInofs *ModelInfo `bson:"model_infos" json:"model_infos"`
Hardware *HardwareInfo `bson:"hardware" json:"hardware"`
}
type WorkerModelInfo struct {
WorkerId string `bson:"worker_id" json:"worker_id"`
ModelId string `bson:"model_id" json:"model_id"`
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment