Commit 116fa5b6 authored by vicotor's avatar vicotor

add more interface

parent dae37c71
......@@ -12,6 +12,7 @@ type WorkerInstalledInfo struct {
WorkerId string `bson:"worker_id" json:"worker_id"` // use worker MinerPubkey
ModelId int `bson:"model_id" json:"model_id"` // installed model.
GpuFree int64 `bson:"gpu_free" json:"gpu_free"`
GpuSeq int `bson:"gpu_seq" json:"gpu_seq"`
}
type WorkerInstalledOperator struct {
......@@ -26,12 +27,25 @@ func NewDBWorkerInstalled(client *mongo.Client, database string) *WorkerInstalle
}
}
func (d *WorkerInstalledOperator) InsertMany(ctx context.Context, workers []*WorkerInstalledInfo) (*mongo.InsertManyResult, error) {
var docs []interface{}
for _, worker := range workers {
docs = append(docs, worker)
}
return d.col.InsertMany(ctx, docs)
}
func (d *WorkerInstalledOperator) DeleteMany(ctx context.Context, workerid string, models []int) (int, error) {
res, err := d.col.DeleteMany(ctx, bson.M{"worker_id": workerid, "model_id": bson.M{"$in": models}})
return int(res.DeletedCount), err
}
func (d *WorkerInstalledOperator) Insert(ctx context.Context, worker *WorkerInstalledInfo) (*mongo.InsertOneResult, error) {
return d.col.InsertOne(ctx, worker)
}
func (d *WorkerInstalledOperator) UpdateGpuFree(ctx context.Context, workerid string, gpuFree int64) error {
update := bson.M{"$set": bson.M{"gpu_free": gpuFree}}
func (d *WorkerInstalledOperator) UpdateGpuFree(ctx context.Context, workerid string, gpuFree int64, gpuSeq int) error {
update := bson.M{"$set": bson.M{"gpu_free": gpuFree, "gpu_seq": gpuSeq}}
_, err := d.col.UpdateMany(ctx, bson.M{"worker_id": workerid}, update)
return err
}
......
......@@ -26,13 +26,27 @@ func NewDBWorkerRunning(client *mongo.Client, database string) *WorkerRunningOpe
}
}
func (d *WorkerRunningOperator) InsertMany(ctx context.Context, workers []*WorkerRunningInfo) (*mongo.InsertManyResult, error) {
var docs []interface{}
for _, worker := range workers {
docs = append(docs, worker)
}
return d.col.InsertMany(ctx, docs)
}
func (d *WorkerRunningOperator) DeleteMany(ctx context.Context, workerid string, models []int) (int, error) {
res, err := d.col.DeleteMany(ctx, bson.M{"worker_id": workerid, "model_id": bson.M{"$in": models}})
return int(res.DeletedCount), err
}
func (d *WorkerRunningOperator) Insert(ctx context.Context, worker *WorkerRunningInfo) (*mongo.InsertOneResult, error) {
return d.col.InsertOne(ctx, worker)
}
func (d *WorkerRunningOperator) UpdateExecTime(ctx context.Context, id string, execTime int) error {
func (d *WorkerRunningOperator) UpdateExecTime(ctx context.Context, workerid string, model int, execTime int) error {
filter := bson.M{"worker_id": workerid, "model_id": model}
update := bson.M{"$set": bson.M{"exec_time": execTime}}
_, err := d.col.UpdateOne(ctx, bson.M{"_id": id}, update)
_, err := d.col.UpdateOne(ctx, filter, update)
return err
}
......
......@@ -57,21 +57,92 @@ func (d *WorkerInfoOperator) DeleteByWorkerId(ctx context.Context, workerid stri
return int(res.DeletedCount), err
}
func (d *WorkerInfoOperator) UpdateModel(ctx context.Context, id string, models *types.ModelInfo) error {
func (d *WorkerInfoOperator) UpdateModel(ctx context.Context, workerid string, models *types.ModelInfo) error {
update := bson.M{"$set": bson.M{"model_infos": models}}
_, err := d.col.UpdateOne(ctx, bson.M{"_id": id}, update)
_, err := d.col.UpdateOne(ctx, bson.M{"worker_id": workerid}, update)
return err
}
func (d *WorkerInfoOperator) UpdateHardware(ctx context.Context, id string, hardware *types.HardwareInfo) error {
func (d *WorkerInfoOperator) UpdateBenefitAddr(ctx context.Context, workerid string, benefitAddr string) error {
update := bson.M{"$set": bson.M{"node_info.benefit_addr": benefitAddr}}
_, err := d.col.UpdateOne(ctx, bson.M{"worker_id": workerid}, update)
return err
}
func (d *WorkerInfoOperator) AddRunningModels(ctx context.Context, workerid string, models []*types.RunningModel) error {
updates := make([]mongo.WriteModel, len(models))
for i, model := range models {
updates[i] = mongo.NewUpdateOneModel().SetFilter(bson.M{"worker_id": workerid}).SetUpdate(bson.M{"$push": bson.M{"model_infos.running_models": model}})
}
_, err := d.col.BulkWrite(ctx, updates)
return err
}
func (d *WorkerInfoOperator) DeleteRunningModels(ctx context.Context, workerid string, models []int) error {
updates := make([]mongo.WriteModel, len(models))
for i, model := range models {
updates[i] = mongo.NewUpdateOneModel().SetFilter(bson.M{"worker_id": workerid}).SetUpdate(bson.M{"$pull": bson.M{"model_infos.running_models": bson.M{"model_id": model}}})
}
_, err := d.col.BulkWrite(ctx, updates)
return err
}
func (d *WorkerInfoOperator) AddInstalledModels(ctx context.Context, workerid string, models []*types.InstalledModel) error {
updates := make([]mongo.WriteModel, len(models))
for i, model := range models {
updates[i] = mongo.NewUpdateOneModel().SetFilter(bson.M{"worker_id": workerid}).SetUpdate(bson.M{"$push": bson.M{"model_infos.installed_models": model}})
}
_, err := d.col.BulkWrite(ctx, updates)
return err
}
func (d *WorkerInfoOperator) DeleteInstalledModels(ctx context.Context, workerid string, modelids []int) error {
updates := make([]mongo.WriteModel, len(modelids))
for i, modelid := range modelids {
updates[i] = mongo.NewUpdateOneModel().SetFilter(bson.M{"worker_id": workerid}).SetUpdate(bson.M{"$pull": bson.M{"model_infos.installed_models": bson.M{"model_id": modelid}}})
}
_, err := d.col.BulkWrite(ctx, updates)
return err
}
func (d *WorkerInfoOperator) UpdateInstalledModelStatus(ctx context.Context, workerid string, modelid int, lastRunTime int64) error {
update := bson.M{"$set": bson.M{"model_infos.installed_models.$.last_run_time": lastRunTime}}
_, err := d.col.UpdateOne(ctx, bson.M{"worker_id": workerid, "model_infos.installed_models.model_id": modelid}, update)
return err
}
func (d *WorkerInfoOperator) UpdateRunningModelStatus(ctx context.Context, workerid string,
modelid int, lastWorkTime int64, totalRunCount int64, exectime int) error {
update := bson.M{"$set": bson.M{"model_infos.running_models.$.last_work_time": lastWorkTime, "model_infos.running_models.$.total_run_count": totalRunCount,
"model_infos.running_models.$.exec_time": exectime}}
_, err := d.col.UpdateOne(ctx, bson.M{"worker_id": workerid, "model_infos.running_models.model_id": modelid}, update)
return err
}
//
//func (d *WorkerInfoOperator) UpdateRunningModelInfo(ctx context.Context, workerid string, modelid string, execTime int) error {
// // update running model exec time.
// update := bson.M{"$set": bson.M{"model_infos.running_models.$.exec_time": execTime}}
// _, err := d.col.UpdateOne(ctx, bson.M{"worker_id": workerid, "model_infos.running_models.model_id": modelid}, update)
// return err
//}
//
//func (d *WorkerInfoOperator) UpdateInstalledModelInfo(ctx context.Context, workerid string, modelid string, gpuFree int64) error {
// // update installed model gpu free.
// update := bson.M{"$set": bson.M{"model_infos.installed_models.$.gpu_free": gpuFree}}
// _, err := d.col.UpdateOne(ctx, bson.M{"worker_id": workerid, "model_infos.installed_models.model_id": modelid}, update)
// return err
//}
func (d *WorkerInfoOperator) UpdateHardware(ctx context.Context, workerid string, hardware *types.HardwareInfo) error {
update := bson.M{"$set": bson.M{"hardware": hardware}}
_, err := d.col.UpdateOne(ctx, bson.M{"_id": id}, update)
_, err := d.col.UpdateOne(ctx, bson.M{"worker_id": workerid}, update)
return err
}
func (d *WorkerInfoOperator) UpdateNodeInfo(ctx context.Context, id string, nodeInfo *types.NodeInfo) error {
func (d *WorkerInfoOperator) UpdateNodeInfo(ctx context.Context, workerid string, nodeInfo *types.NodeInfo) error {
update := bson.M{"$set": bson.M{"node_info": nodeInfo}}
_, err := d.col.UpdateOne(ctx, bson.M{"_id": id}, update)
_, err := d.col.UpdateOne(ctx, bson.M{"worker_id": workerid}, update)
return err
}
......@@ -80,6 +151,31 @@ func (d *WorkerInfoOperator) UpdateWorker(ctx context.Context, worker *WorkerInf
return err
}
func (d *WorkerInfoOperator) UpdateHardwareUsage(ctx context.Context, workerid string, usage types.DeviceUsage) error {
_, err := d.col.BulkWrite(ctx, []mongo.WriteModel{
mongo.NewUpdateOneModel().SetFilter(bson.M{"worker_id": workerid}).SetUpdate(bson.M{"$set": bson.M{"hardware.CPU.usage": usage.CpuUsage}}),
mongo.NewUpdateOneModel().SetFilter(bson.M{"worker_id": workerid}).SetUpdate(bson.M{"$set": bson.M{"hardware.RAM.free": usage.RamUsage}}),
mongo.NewUpdateOneModel().SetFilter(bson.M{"worker_id": workerid}).SetUpdate(bson.M{"$set": bson.M{"hardware.DISK.free": usage.DiskUsage}}),
mongo.NewUpdateOneModel().SetFilter(bson.M{"worker_id": workerid}).SetUpdate(bson.M{"$set": bson.M{"hardware.NET.bandwidth": usage.NetBandwidth}}),
})
return err
}
func (d *WorkerInfoOperator) UpdateGPUUsage(ctx context.Context, workerid string, usages []types.GpuUsage) error {
updates := make([]mongo.WriteModel, len(usages))
for i, usage := range usages {
// update gpu info
//MemFree int64 `bson:"mem_free" json:"mem_free"`
//Usage int `bson:"usage" json:"usage"`
//Temp int `bson:"temp" json:"temp "`
//PowerRt int `bson:"power_rt" json:"power_rt"`
updates[i] = mongo.NewUpdateOneModel().SetFilter(bson.M{"worker_id": workerid, "hardware.gpu.seq": usage.GpuSeq}).SetUpdate(bson.M{"$set": bson.M{"hardware.gpu.$.usage": usage.Usage,
"hardware.gpu.$.mem_free": usage.MemFree, "hardware.gpu.$.temp": usage.Temp, "hardware.gpu.$.power_rt": usage.Powrt}})
}
_, err := d.col.BulkWrite(ctx, updates)
return err
}
func (d *WorkerInfoOperator) FindWorkerByRunningModelAndSortByWaitTime(ctx context.Context, modelId string, limit int) ([]*WorkerInfo, error) {
// find all worker that at least one running model's mode_id is equal modelId
// sort by wait time
......
......@@ -294,3 +294,37 @@ func (n *NetInfo) ToPb() *omanager.NetInfo {
Bandwidth: int32(n.Bandwidth),
}
}
type DeviceUsage struct {
CpuUsage int
RamUsage int64
DiskUsage int64
NetBandwidth int
}
func PbToDeviceUsage(pb *omanager.HardwareUsage) DeviceUsage {
return DeviceUsage{
CpuUsage: int(pb.CpuUsage),
RamUsage: int64(pb.RamUsage),
DiskUsage: int64(pb.DiskUsage),
NetBandwidth: int(pb.NetBandwidth),
}
}
type GpuUsage struct {
GpuSeq int
Usage int
MemFree int64
Powrt int
Temp int
}
func PbToGpuUsage(pb *omanager.GPUUsage) GpuUsage {
return GpuUsage{
GpuSeq: int(pb.Seq),
Usage: int(pb.Usage),
MemFree: pb.MemFree,
Powrt: int(pb.PowerRt),
Temp: int(pb.Temp),
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment