Commit 694be5e5 authored by vicotor's avatar vicotor

add test and update

parent 21dadcd1
...@@ -242,7 +242,7 @@ func TestWorkerInstalledOperator_FindWorkerByModelIdAndGpuMem(t *testing.T) { ...@@ -242,7 +242,7 @@ func TestWorkerInstalledOperator_FindWorkerByModelIdAndGpuMem(t *testing.T) {
// defer rundb.client.Disconnect(context.Background()) // defer rundb.client.Disconnect(context.Background())
// //
// for i := 0; i < count; i++ { // for i := 0; i < count; i++ {
// worker := generateAWroker() // worker := generateAWorker()
// infodb.InsertWorker(context.Background(), worker) // infodb.InsertWorker(context.Background(), worker)
// for _, installed := range worker.Models.InstalledModels { // for _, installed := range worker.Models.InstalledModels {
// id, _ := strconv.Atoi(installed.ModelID) // id, _ := strconv.Atoi(installed.ModelID)
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"context" "context"
"github.com/odysseus/mogo/types" "github.com/odysseus/mogo/types"
"go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/options"
) )
...@@ -28,6 +29,17 @@ func NewDBWorker(client *mongo.Client, database string) *WorkerInfoOperator { ...@@ -28,6 +29,17 @@ func NewDBWorker(client *mongo.Client, database string) *WorkerInfoOperator {
} }
} }
func (d *WorkerInfoOperator) Clear() {
d.col.DeleteMany(context.Background(), bson.M{})
}
func (d *WorkerInfoOperator) Get(ctx context.Context, id string) (*WorkerInfo, error) {
var worker WorkerInfo
oid, _ := primitive.ObjectIDFromHex(id)
err := d.col.FindOne(ctx, bson.M{"_id": oid}).Decode(&worker)
return &worker, err
}
func (d *WorkerInfoOperator) GetAllWorkerId(ctx context.Context) ([]string, error) { func (d *WorkerInfoOperator) GetAllWorkerId(ctx context.Context) ([]string, error) {
filter := bson.D{} filter := bson.D{}
opts := options.Find().SetProjection(bson.D{{"worker_id", 1}}) opts := options.Find().SetProjection(bson.D{{"worker_id", 1}})
...@@ -52,8 +64,16 @@ func (d *WorkerInfoOperator) InsertWorker(ctx context.Context, worker *WorkerInf ...@@ -52,8 +64,16 @@ func (d *WorkerInfoOperator) InsertWorker(ctx context.Context, worker *WorkerInf
return d.col.InsertOne(ctx, worker) return d.col.InsertOne(ctx, worker)
} }
func (d *WorkerInfoOperator) InsertMany(ctx context.Context, workers []*WorkerInfo) (*mongo.InsertManyResult, error) {
var docs []interface{}
for _, worker := range workers {
docs = append(docs, worker)
}
return d.col.InsertMany(ctx, docs)
}
func (d *WorkerInfoOperator) DeleteByWorkerId(ctx context.Context, workerid string) (int, error) { func (d *WorkerInfoOperator) DeleteByWorkerId(ctx context.Context, workerid string) (int, error) {
res, err := d.col.DeleteOne(ctx, bson.M{"worker_id": workerid}) res, err := d.col.DeleteMany(ctx, bson.M{"worker_id": workerid})
return int(res.DeletedCount), err return int(res.DeletedCount), err
} }
......
...@@ -20,11 +20,7 @@ import ( ...@@ -20,11 +20,7 @@ import (
var ( var (
maxModelId = 10000 maxModelId = 10000
idlist = make([]string, 0, 1000000) idlist = make([]string, 0, 1000000)
//workers = make([]*DbWorkerInfo, 0, 1000000)
database = "test" database = "test"
collection = "workers"
workerRunningCollection = "worker_running"
workerInstalledCollection = "worker_installed"
) )
func ConnectMongoDB() (*mongo.Client, error) { func ConnectMongoDB() (*mongo.Client, error) {
...@@ -58,7 +54,7 @@ func initdata(client *mongo.Client) []string { ...@@ -58,7 +54,7 @@ func initdata(client *mongo.Client) []string {
// Insert 1,000,000 DbWorkerInfo to operator // Insert 1,000,000 DbWorkerInfo to operator
for i := 0; i < 1000; i++ { for i := 0; i < 1000; i++ {
worker := generateAWroker() worker := generateAWorker()
result, err := db.InsertWorker(context.Background(), worker) result, err := db.InsertWorker(context.Background(), worker)
if err != nil { if err != nil {
panic(fmt.Sprintf("insert worker failed with err:%s", err)) panic(fmt.Sprintf("insert worker failed with err:%s", err))
...@@ -99,7 +95,7 @@ func getRandIdInt(max int) int { ...@@ -99,7 +95,7 @@ func getRandIdInt(max int) int {
return rand.Intn(max) + 1 return rand.Intn(max) + 1
} }
func generateAWroker() *WorkerInfo { func generateAWorker() *WorkerInfo {
return &WorkerInfo{ return &WorkerInfo{
WorkerId: uuid.NewString(), WorkerId: uuid.NewString(),
NodeInfo: generateANodeInfo(), NodeInfo: generateANodeInfo(),
...@@ -253,9 +249,177 @@ func generateAModel() *types.ModelInfo { ...@@ -253,9 +249,177 @@ func generateAModel() *types.ModelInfo {
return m return m
} }
func TestWorkerInfoOperator_InsertWorker(t *testing.T) {
client, err := ConnectMongoDB()
if err != nil {
log.Fatal(err)
}
db := NewDBWorker(client, database)
defer db.client.Disconnect(context.Background())
worker := generateAWorker()
if _, err := db.InsertWorker(context.Background(), worker); err != nil {
t.Errorf("insert worker failed with err:%s", err)
}
}
func TestWorkerInfoOperator_GetAllWorkerId(t *testing.T) {
client, err := ConnectMongoDB()
if err != nil {
log.Fatal(err)
}
db := NewDBWorker(client, database)
defer db.client.Disconnect(context.Background())
db.Clear()
// insert first
ws := make([]*WorkerInfo, 0)
for i := 0; i < 10; i++ {
w := generateAWorker()
ws = append(ws, w)
}
if _, err := db.InsertMany(context.Background(), ws); err != nil {
t.Errorf("insert worker failed with err:%s", err)
}
ids, err := db.GetAllWorkerId(context.Background())
if err != nil {
t.Errorf("get all worker failed with err:%s", err)
}
if len(ids) != len(ws) {
t.Errorf("result check failed")
}
}
func TestWorkerInfoOperator_InsertMany(t *testing.T) {
client, err := ConnectMongoDB()
if err != nil {
log.Fatal(err)
}
db := NewDBWorker(client, database)
defer db.client.Disconnect(context.Background())
db.Clear()
ws := make([]*WorkerInfo, 0)
for i := 0; i < 10; i++ {
w := generateAWorker()
ws = append(ws, w)
}
if res, err := db.InsertMany(context.Background(), ws); err != nil {
t.Errorf("insert worker failed with err:%s", err)
} else {
if len(res.InsertedIDs) != len(ws) {
t.Errorf("insert worker failed with err:%s", err)
}
}
}
func TestWorkerInfoOperator_AddInstalledModels(t *testing.T) {
client, err := ConnectMongoDB()
if err != nil {
log.Fatal(err)
}
db := NewDBWorker(client, database)
defer db.client.Disconnect(context.Background())
db.Clear()
worker := &WorkerInfo{
WorkerId: uuid.NewString(),
NodeInfo: generateANodeInfo(),
Models: &types.ModelInfo{
InstalledModels: []*types.InstalledModel{
&types.InstalledModel{
ModelID: "1",
DiskSize: 101,
InstalledTime: time.Now().Unix(),
LastRunTime: time.Now().Unix(),
},
},
RunningModels: make([]*types.RunningModel, 0, 1000),
},
Hardware: generateAHardware(),
}
is, err := db.InsertWorker(context.Background(), worker)
if err != nil {
t.Errorf("insert worker failed with err:%s", err)
}
id := is.InsertedID.(primitive.ObjectID)
newModel := &types.InstalledModel{
ModelID: "2",
DiskSize: 101,
InstalledTime: time.Now().Unix(),
LastRunTime: time.Now().Unix(),
}
if err := db.AddInstalledModels(context.Background(), worker.WorkerId, []*types.InstalledModel{newModel}); err != nil {
t.Error("add installed models failed")
}
worker, err = db.Get(context.Background(), id.Hex())
if err != nil {
t.Error("get worker failed")
}
if len(worker.Models.InstalledModels) != 2 {
t.Error("add installed models failed")
}
}
func TestWorkerInfoOperator_AddRunningModels(t *testing.T) {
client, err := ConnectMongoDB()
if err != nil {
log.Fatal(err)
}
db := NewDBWorker(client, database)
defer db.client.Disconnect(context.Background())
db.Clear()
worker := &WorkerInfo{
WorkerId: uuid.NewString(),
NodeInfo: generateANodeInfo(),
Models: &types.ModelInfo{
InstalledModels: make([]*types.InstalledModel, 0, 1000),
RunningModels: []*types.RunningModel{
&types.RunningModel{
ModelID: "1",
GpuSeq: 0,
GpuRAM: 1024 * 1024 * 1024,
StartedTime: time.Now().Unix(),
LastWorkTime: time.Now().Unix(),
TotalRunCount: 100,
ExecTime: 100,
},
},
},
Hardware: generateAHardware(),
}
is, err := db.InsertWorker(context.Background(), worker)
if err != nil {
t.Errorf("insert worker failed with err:%s", err)
}
id := is.InsertedID.(primitive.ObjectID)
newModel := &types.RunningModel{
ModelID: "2",
GpuSeq: 1,
GpuRAM: 1024 * 1024 * 1024,
StartedTime: time.Now().Unix(),
LastWorkTime: time.Now().Unix(),
TotalRunCount: 100,
ExecTime: 100,
}
if err := db.AddRunningModels(context.Background(), worker.WorkerId, []*types.RunningModel{newModel}); err != nil {
t.Error("add running models failed")
}
worker, err = db.Get(context.Background(), id.Hex())
if err != nil {
t.Error("get worker failed")
}
if len(worker.Models.RunningModels) != 2 {
t.Error("add running models failed")
}
}
func BenchmarkGenerateWorker(b *testing.B) { func BenchmarkGenerateWorker(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
generateAWroker() generateAWorker()
} }
} }
...@@ -269,7 +433,7 @@ func BenchmarkDbWorker_InsertWorker(b *testing.B) { ...@@ -269,7 +433,7 @@ func BenchmarkDbWorker_InsertWorker(b *testing.B) {
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
b.StopTimer() b.StopTimer()
worker := generateAWroker() worker := generateAWorker()
b.StartTimer() b.StartTimer()
if _, err := db.InsertWorker(context.Background(), worker); err != nil { if _, err := db.InsertWorker(context.Background(), worker); err != nil {
panic(fmt.Sprintf("insert worker failed with err:%s", err)) panic(fmt.Sprintf("insert worker failed with err:%s", err))
...@@ -286,7 +450,7 @@ func BenchmarkDbWorker_InsertWorker_Parallel(b *testing.B) { ...@@ -286,7 +450,7 @@ func BenchmarkDbWorker_InsertWorker_Parallel(b *testing.B) {
defer db.client.Disconnect(context.Background()) defer db.client.Disconnect(context.Background())
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
for pb.Next() { for pb.Next() {
worker := generateAWroker() worker := generateAWorker()
if _, err := db.InsertWorker(context.Background(), worker); err != nil { if _, err := db.InsertWorker(context.Background(), worker); err != nil {
panic(fmt.Sprintf("insert worker failed with err:%s", err)) panic(fmt.Sprintf("insert worker failed with err:%s", err))
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment