Commit 0b9e36dd authored by vicotor's avatar vicotor

update workerinfo

parent 08aca3ed
...@@ -29,6 +29,19 @@ func NewDBWorker(client *mongo.Client, database string) *WorkerInfoOperator { ...@@ -29,6 +29,19 @@ func NewDBWorker(client *mongo.Client, database string) *WorkerInfoOperator {
} }
} }
func (d *WorkerInfoOperator) CreateIndex(ctx context.Context) error {
_, err := d.col.Indexes().CreateMany(ctx, []mongo.IndexModel{
{
Keys: bson.D{{"worker_id", 1}},
},
{
Keys: bson.D{{"model_infos.running_models.wait_time", 1},
{"model_infos.running_models.model_id", 1}},
},
})
return err
}
func (d *WorkerInfoOperator) Clear() { func (d *WorkerInfoOperator) Clear() {
d.col.DeleteMany(context.Background(), bson.M{}) d.col.DeleteMany(context.Background(), bson.M{})
} }
...@@ -218,9 +231,9 @@ func (d *WorkerInfoOperator) FindWorkerByInstallModelAndSortByGpuRam(ctx context ...@@ -218,9 +231,9 @@ func (d *WorkerInfoOperator) FindWorkerByInstallModelAndSortByGpuRam(ctx context
// sort by gpu ram // sort by gpu ram
findOptions := options.Find() findOptions := options.Find()
findOptions.SetLimit(int64(limit)) findOptions.SetLimit(int64(limit))
findOptions.SetSort(bson.D{{"hardware.GPU.ram", -1}}) findOptions.SetSort(bson.D{{"hardware.GPU.mem_free", -1}})
selector := bson.M{"model_infos.installed_models.model_id": modelId, "hardware.GPU.performance": bson.M{"$gte": performance}, "hardware.GPU.ram": bson.M{"$gte": ram}} selector := bson.M{"model_infos.installed_models.model_id": modelId, "hardware.GPU.performance": bson.M{"$gte": performance}, "hardware.GPU.mem_free": bson.M{"$gte": ram}}
cursor, err := d.col.Find(ctx, selector, findOptions) cursor, err := d.col.Find(ctx, selector, findOptions)
if err != nil { if err != nil {
return nil, err return nil, err
......
...@@ -13,14 +13,16 @@ import ( ...@@ -13,14 +13,16 @@ import (
"log" "log"
"math/rand" "math/rand"
"strconv" "strconv"
"sync"
"testing" "testing"
"time" "time"
) )
var ( var (
maxModelId = 10000 maxModelId = 1000
idlist = make([]string, 0, 1000000) idlist = make([]string, 0, 1000000)
database = "test" database = "test"
once = sync.Once{}
) )
func ConnectMongoDB() (*mongo.Client, error) { func ConnectMongoDB() (*mongo.Client, error) {
...@@ -39,39 +41,50 @@ func ConnectMongoDB() (*mongo.Client, error) { ...@@ -39,39 +41,50 @@ func ConnectMongoDB() (*mongo.Client, error) {
return client, nil return client, nil
} }
//func init() { func initdata(client *mongo.Client, count int, running bool, installed bool) []string {
// client, err := ConnectMongoDB()
// if err != nil {
// log.Fatal(err)
// }
// idlist = initdata(client)
//}
func initdata(client *mongo.Client) []string {
t1 := time.Now() t1 := time.Now()
db := NewDBWorker(client, database) db := NewDBWorker(client, database)
dbRunning := NewDBWorkerRunning(client, database) dbRunning := NewDBWorkerRunning(client, database)
dbInstalled := NewDBWorkerInstalled(client, database)
// Insert 1,000,000 DbWorkerInfo to operator // Insert 1,000,000 DbWorkerInfo to operator
for i := 0; i < 1000; i++ { for i := 0; i < count; i++ {
worker := generateAWorker() worker := generateAWorker()
result, err := db.InsertWorker(context.Background(), worker) result, err := db.InsertWorker(context.Background(), worker)
if err != nil { if err != nil {
panic(fmt.Sprintf("insert worker failed with err:%s", err)) panic(fmt.Sprintf("insert worker failed with err:%s", err))
} }
{ if running {
// add worker running info to dbRunning // add worker running info to dbRunning
runnings := make([]*WorkerRunningInfo, 0, len(worker.Models.RunningModels))
for _, model := range worker.Models.RunningModels { for _, model := range worker.Models.RunningModels {
id, _ := strconv.Atoi(model.ModelID) id, _ := strconv.Atoi(model.ModelID)
runningInfo := &WorkerRunningInfo{ runnings = append(runnings, &WorkerRunningInfo{
WorkerId: worker.WorkerId, WorkerId: worker.WorkerId,
ModelId: id, ModelId: id,
ExecTime: model.ExecTime, ExecTime: model.ExecTime,
} })
_, err := dbRunning.Insert(context.Background(), runningInfo) }
if err != nil { _, err = dbRunning.InsertMany(context.Background(), runnings)
panic(fmt.Sprintf("insert worker failed with err:%s", err)) if err != nil {
} panic(fmt.Sprintf("insert worker running failed with err:%s", err))
}
}
if installed {
// add worker installed info to dbInstalled
installeds := make([]*WorkerInstalledInfo, 0, len(worker.Models.InstalledModels))
for _, model := range worker.Models.InstalledModels {
id, _ := strconv.Atoi(model.ModelID)
installeds = append(installeds, &WorkerInstalledInfo{
WorkerId: worker.WorkerId,
ModelId: id,
GpuFree: 1024 * 1024 * 1024,
GpuSeq: 0,
})
}
_, err = dbInstalled.InsertMany(context.Background(), installeds)
if err != nil {
panic(fmt.Sprintf("insert worker installed failed with err:%s", err))
} }
} }
...@@ -223,6 +236,15 @@ func generateAInstallModel() *types.InstalledModel { ...@@ -223,6 +236,15 @@ func generateAInstallModel() *types.InstalledModel {
} }
} }
func generateAInstallModelWithId(id int) *types.InstalledModel {
return &types.InstalledModel{
ModelID: strconv.Itoa(id),
DiskSize: 101,
InstalledTime: time.Now().Unix(),
LastRunTime: time.Now().Unix(),
}
}
func generateARunningModel() *types.RunningModel { func generateARunningModel() *types.RunningModel {
return &types.RunningModel{ return &types.RunningModel{
ModelID: getRandId(maxModelId), ModelID: getRandId(maxModelId),
...@@ -234,13 +256,24 @@ func generateARunningModel() *types.RunningModel { ...@@ -234,13 +256,24 @@ func generateARunningModel() *types.RunningModel {
ExecTime: rand.Intn(100), ExecTime: rand.Intn(100),
} }
} }
func generateARunningModelWithId(id int) *types.RunningModel {
return &types.RunningModel{
ModelID: strconv.Itoa(id),
GpuSeq: rand.Intn(3),
GpuRAM: generateAGpuRam(),
StartedTime: time.Now().Unix(),
LastWorkTime: time.Now().Unix(),
TotalRunCount: rand.Intn(100),
ExecTime: rand.Intn(100),
}
}
func generateAModel() *types.ModelInfo { func generateAModel() *types.ModelInfo {
m := &types.ModelInfo{ m := &types.ModelInfo{
InstalledModels: make([]*types.InstalledModel, 0, 1000), InstalledModels: make([]*types.InstalledModel, 0, 1000),
RunningModels: make([]*types.RunningModel, 0, 1000), RunningModels: make([]*types.RunningModel, 0, 1000),
} }
for i := 0; i < 100; i++ { for i := 0; i < 200; i++ {
m.InstalledModels = append(m.InstalledModels, generateAInstallModel()) m.InstalledModels = append(m.InstalledModels, generateAInstallModel())
if len(m.RunningModels) < 100 { if len(m.RunningModels) < 100 {
m.RunningModels = append(m.RunningModels, generateARunningModel()) m.RunningModels = append(m.RunningModels, generateARunningModel())
...@@ -479,6 +512,13 @@ func BenchmarkDbWorker_InsertWorker(b *testing.B) { ...@@ -479,6 +512,13 @@ func BenchmarkDbWorker_InsertWorker(b *testing.B) {
log.Fatal(err) log.Fatal(err)
} }
db := NewDBWorker(client, database) db := NewDBWorker(client, database)
once.Do(func() {
db.Clear()
if err := db.CreateIndex(context.Background()); err != nil {
panic(fmt.Sprintf("create index failed with err:%s", err))
}
})
defer db.client.Disconnect(context.Background()) defer db.client.Disconnect(context.Background())
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
...@@ -497,6 +537,12 @@ func BenchmarkDbWorker_InsertWorker_Parallel(b *testing.B) { ...@@ -497,6 +537,12 @@ func BenchmarkDbWorker_InsertWorker_Parallel(b *testing.B) {
log.Fatal(err) log.Fatal(err)
} }
db := NewDBWorker(client, database) db := NewDBWorker(client, database)
once.Do(func() {
db.Clear()
if err := db.CreateIndex(context.Background()); err != nil {
panic(fmt.Sprintf("create index failed with err:%s", err))
}
})
defer db.client.Disconnect(context.Background()) defer db.client.Disconnect(context.Background())
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
for pb.Next() { for pb.Next() {
...@@ -513,8 +559,18 @@ func BenchmarkDbWorker_UpdateHardware(b *testing.B) { ...@@ -513,8 +559,18 @@ func BenchmarkDbWorker_UpdateHardware(b *testing.B) {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
b.StopTimer()
db := NewDBWorker(client, database) db := NewDBWorker(client, database)
defer db.client.Disconnect(context.Background()) defer db.client.Disconnect(context.Background())
once.Do(func() {
db.Clear()
if err := db.CreateIndex(context.Background()); err != nil {
panic(fmt.Sprintf("create index failed with err:%s", err))
}
idlist = initdata(client, 10000, false, false)
})
b.StartTimer()
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
...@@ -535,13 +591,26 @@ func BenchmarkDbWorker_UpdateHardware_Parallel(b *testing.B) { ...@@ -535,13 +591,26 @@ func BenchmarkDbWorker_UpdateHardware_Parallel(b *testing.B) {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
b.StopTimer()
db := NewDBWorker(client, database) db := NewDBWorker(client, database)
defer db.client.Disconnect(context.Background()) defer db.client.Disconnect(context.Background())
once.Do(func() {
db.Clear()
if err := db.CreateIndex(context.Background()); err != nil {
panic(fmt.Sprintf("create index failed with err:%s", err))
}
idlist = initdata(client, 10000, false, false)
})
b.StartTimer()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
for pb.Next() { for pb.Next() {
b.StopTimer()
idx := rand.Intn(len(idlist)) idx := rand.Intn(len(idlist))
nhardware := generateAHardware() nhardware := generateAHardware()
b.StartTimer()
if err := db.UpdateHardware(context.Background(), idlist[idx], nhardware); err != nil { if err := db.UpdateHardware(context.Background(), idlist[idx], nhardware); err != nil {
panic(fmt.Sprintf("update worker failed with err:%s", err)) panic(fmt.Sprintf("update worker failed with err:%s", err))
} }
...@@ -554,17 +623,29 @@ func BenchmarkDbWorker_FindWorkerByInstallModelAndSortByGpuRam(b *testing.B) { ...@@ -554,17 +623,29 @@ func BenchmarkDbWorker_FindWorkerByInstallModelAndSortByGpuRam(b *testing.B) {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
b.StopTimer()
db := NewDBWorker(client, database) db := NewDBWorker(client, database)
defer db.client.Disconnect(context.Background()) defer db.client.Disconnect(context.Background())
once.Do(func() {
db.Clear()
if err := db.CreateIndex(context.Background()); err != nil {
panic(fmt.Sprintf("create index failed with err:%s", err))
}
idlist = initdata(client, 10000, false, false)
})
b.StartTimer()
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
b.StopTimer()
installedModelId := getRandId(maxModelId) installedModelId := getRandId(maxModelId)
performance := generateAGpuPerformance() performance := generateAGpuPerformance()
ram := generateAGpuRam() ram := int64(100 * 1024) //generateAGpuRam()
b.StartTimer()
if w, err := db.FindWorkerByInstallModelAndSortByGpuRam(context.Background(), installedModelId, performance, ram, 10); err != nil { if w, err := db.FindWorkerByInstallModelAndSortByGpuRam(context.Background(), installedModelId, performance, ram, 10); err != nil {
panic(fmt.Sprintf("find worker failed with err:%s", err)) panic(fmt.Sprintf("find worker failed with err:%s", err))
} else if len(w) == 0 { } else if len(w) == 0 {
b.Logf("FindWorkerByInstallModelAndSortByGpuRam find %d with id %s\n", len(w), installedModelId) //b.Logf("FindWorkerByInstallModelAndSortByGpuRam find %d with id %s\n", len(w), installedModelId)
} }
} }
} }
...@@ -574,17 +655,32 @@ func BenchmarkDbWorker_FindWorkerByInstallModelAndSortByGpuRam_Parallel(b *testi ...@@ -574,17 +655,32 @@ func BenchmarkDbWorker_FindWorkerByInstallModelAndSortByGpuRam_Parallel(b *testi
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
b.StopTimer()
db := NewDBWorker(client, database) db := NewDBWorker(client, database)
defer db.client.Disconnect(context.Background()) defer db.client.Disconnect(context.Background())
once.Do(func() {
db.Clear()
if err := db.CreateIndex(context.Background()); err != nil {
panic(fmt.Sprintf("create index failed with err:%s", err))
}
idlist = initdata(client, 10000, false, false)
})
b.StartTimer()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
for pb.Next() { for pb.Next() {
b.StopTimer()
installedModelId := getRandId(maxModelId) installedModelId := getRandId(maxModelId)
performance := generateAGpuPerformance() performance := generateAGpuPerformance()
ram := generateAGpuRam() ram := int64(100 * 1024) //generateAGpuRam()
b.StartTimer()
if w, err := db.FindWorkerByInstallModelAndSortByGpuRam(context.Background(), installedModelId, performance, ram, 10); err != nil { if w, err := db.FindWorkerByInstallModelAndSortByGpuRam(context.Background(), installedModelId, performance, ram, 10); err != nil {
panic(fmt.Sprintf("find worker failed with err:%s", err)) panic(fmt.Sprintf("find worker failed with err:%s", err))
} else if len(w) == 0 { } else if len(w) == 0 {
b.Logf("FindWorkerByInstallModelAndSortByGpuRam find %d with id %s\n", len(w), installedModelId) //b.Logf("FindWorkerByInstallModelAndSortByGpuRam find %d with id %s\n", len(w), installedModelId)
} else {
} }
} }
}) })
...@@ -595,11 +691,23 @@ func BenchmarkDbWorker_FindWorkerByRunningModelAndSortByWaitTime(b *testing.B) { ...@@ -595,11 +691,23 @@ func BenchmarkDbWorker_FindWorkerByRunningModelAndSortByWaitTime(b *testing.B) {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
b.StopTimer()
db := NewDBWorker(client, database) db := NewDBWorker(client, database)
defer db.client.Disconnect(context.Background()) defer db.client.Disconnect(context.Background())
once.Do(func() {
db.Clear()
if err := db.CreateIndex(context.Background()); err != nil {
panic(fmt.Sprintf("create index failed with err:%s", err))
}
idlist = initdata(client, 1000, false, false)
})
b.StartTimer()
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
b.StopTimer()
runningModelId := getRandId(maxModelId) runningModelId := getRandId(maxModelId)
b.StartTimer()
if w, err := db.FindWorkerByRunningModelAndSortByWaitTime(context.Background(), runningModelId, 10); err != nil { if w, err := db.FindWorkerByRunningModelAndSortByWaitTime(context.Background(), runningModelId, 10); err != nil {
panic(fmt.Sprintf("find worker failed with err:%s", err)) panic(fmt.Sprintf("find worker failed with err:%s", err))
} else if len(w) == 0 { } else if len(w) == 0 {
...@@ -613,11 +721,24 @@ func BenchmarkDbWorker_FindWorkerByRunningModelAndSortByWaitTime_Parallel(b *tes ...@@ -613,11 +721,24 @@ func BenchmarkDbWorker_FindWorkerByRunningModelAndSortByWaitTime_Parallel(b *tes
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
b.StopTimer()
db := NewDBWorker(client, database) db := NewDBWorker(client, database)
defer db.client.Disconnect(context.Background()) defer db.client.Disconnect(context.Background())
once.Do(func() {
db.Clear()
if err := db.CreateIndex(context.Background()); err != nil {
panic(fmt.Sprintf("create index failed with err:%s", err))
}
idlist = initdata(client, 1000, false, false)
})
b.StartTimer()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
for pb.Next() { for pb.Next() {
b.StopTimer()
runningModelId := getRandId(maxModelId) runningModelId := getRandId(maxModelId)
b.StartTimer()
if w, err := db.FindWorkerByRunningModelAndSortByWaitTime(context.Background(), runningModelId, 10); err != nil { if w, err := db.FindWorkerByRunningModelAndSortByWaitTime(context.Background(), runningModelId, 10); err != nil {
panic(fmt.Sprintf("find worker failed with err:%s", err)) panic(fmt.Sprintf("find worker failed with err:%s", err))
} else if len(w) == 0 { } else if len(w) == 0 {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment