Commit 20e9b767 authored by vicotor's avatar vicotor

update ticker and add more msg

parent a396ec6b
...@@ -240,7 +240,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error { ...@@ -240,7 +240,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
heartBeatTicker := time.NewTicker(initialHeartBeatInterval) heartBeatTicker := time.NewTicker(initialHeartBeatInterval)
defer heartBeatTicker.Stop() defer heartBeatTicker.Stop()
nodeinfoTicker := time.NewTicker(initialHeartBeatInterval) nodeinfoTicker := time.NewTicker(initialHeartBeatInterval * 10)
defer nodeinfoTicker.Stop() defer nodeinfoTicker.Stop()
workerCheckTicker := time.NewTicker(workerCheckDuration) workerCheckTicker := time.NewTicker(workerCheckDuration)
...@@ -249,7 +249,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error { ...@@ -249,7 +249,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
deviceUsageTicker := time.NewTicker(initialInterval) deviceUsageTicker := time.NewTicker(initialInterval)
defer deviceUsageTicker.Stop() defer deviceUsageTicker.Stop()
gpuUsageTicker := time.NewTicker(initialInterval) gpuUsageTicker := time.NewTicker(initialInterval * 5)
defer gpuUsageTicker.Stop() defer gpuUsageTicker.Stop()
worker.status = "connected" worker.status = "connected"
...@@ -285,7 +285,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error { ...@@ -285,7 +285,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
case <-workerCheckTicker.C: case <-workerCheckTicker.C:
if worker.info.nodeInfo != nil { if worker.info.nodeInfo != nil {
nodeinfoTicker.Reset(time.Hour * 24) //nodeinfoTicker.Reset(time.Hour * 24)
} }
if worker.usage.hwUsage != nil { if worker.usage.hwUsage != nil {
...@@ -604,6 +604,71 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -604,6 +604,71 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
"worker-addr": worker.workerAddr, "worker-addr": worker.workerAddr,
}).Debugf("receive worker device usage:%v", msg.DeviceUsage.Usage) }).Debugf("receive worker device usage:%v", msg.DeviceUsage.Usage)
case *omanager.WorkerMessage_AddModelRunning:
if !worker.registed {
continue
}
l.WithFields(log.Fields{
"worker-addr": worker.workerAddr,
"model count": len(msg.AddModelRunning.Models),
}).Debugf("receive worker add model running:%v", msg.AddModelRunning.Models)
case *omanager.WorkerMessage_DelModeRunning:
if !worker.registed {
continue
}
l.WithFields(log.Fields{
"worker-addr": worker.workerAddr,
"model count": len(msg.DelModeRunning.ModelIds),
}).Debugf("receive worker del model running:%v", msg.DelModeRunning.ModelIds)
case *omanager.WorkerMessage_AddModelInstalled:
if !worker.registed {
continue
}
l.WithFields(log.Fields{
"worker-addr": worker.workerAddr,
"model count": len(msg.AddModelInstalled.Models),
}).Debugf("receive worker add model installed:%v", msg.AddModelInstalled.Models)
case *omanager.WorkerMessage_DelModelInstalled:
if !worker.registed {
continue
}
l.WithFields(log.Fields{
"worker-addr": worker.workerAddr,
"model count": len(msg.DelModelInstalled.ModelIds),
}).Debugf("receive worker del model installed:%v", msg.DelModelInstalled.ModelIds)
case *omanager.WorkerMessage_InstalledModelStatus:
if !worker.registed {
continue
}
l.WithFields(log.Fields{
"worker-addr": worker.workerAddr,
"model": len(msg.InstalledModelStatus.ModelId),
"type": "status",
}).Debugf("receive worker installed model status:%v", msg.InstalledModelStatus)
case *omanager.WorkerMessage_RunningModelStatus:
if !worker.registed {
continue
}
l.WithFields(log.Fields{
"worker-addr": worker.workerAddr,
"model": len(msg.RunningModelStatus.ModelId),
"type": "status",
}).Debugf("receive worker running model status:%v", msg.RunningModelStatus)
case *omanager.WorkerMessage_GpuUsage:
if !worker.registed {
continue
}
l.WithFields(log.Fields{
"worker-addr": worker.workerAddr,
"usage count": len(msg.GpuUsage.Usages),
}).Debugf("receive worker gpu usage:%v", msg.GpuUsage.Usages)
case *omanager.WorkerMessage_RegisteMessage: case *omanager.WorkerMessage_RegisteMessage:
if worker.registed { if worker.registed {
continue continue
......
...@@ -84,6 +84,12 @@ func (wm *WorkerManager) AddWorkerFirst(worker *Worker) error { ...@@ -84,6 +84,12 @@ func (wm *WorkerManager) AddWorkerFirst(worker *Worker) error {
Models: types.PbToModelInfo(worker.info.nodeInfo.Models), Models: types.PbToModelInfo(worker.info.nodeInfo.Models),
Hardware: types.PbToHardwareInfo(worker.info.nodeInfo.Hardware), Hardware: types.PbToHardwareInfo(worker.info.nodeInfo.Hardware),
}) })
for _, installed := range worker.info.nodeInfo.Models.InstalledModels {
wm.workerInstalledOperator.Insert(context.Background(), &operator.WorkerInstalledInfo{
WorkerId: worker.WorkerAccount().String(),
ModelId: installed.ModelId,
})
}
if err != nil { if err != nil {
log.WithError(err).Error("insert worker info failed") log.WithError(err).Error("insert worker info failed")
return err return err
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment