Commit 4f0faa9a authored by vicotor's avatar vicotor

update

parent 880088e7
...@@ -280,8 +280,12 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error { ...@@ -280,8 +280,12 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
gb.GoodbyeMessage = &omanager.GoodbyeMessage{} gb.GoodbyeMessage = &omanager.GoodbyeMessage{}
msg.Message = gb msg.Message = gb
case <-worker.quit: case reason, ok := <-worker.quit:
if ok {
log.WithField("reason", reason).WithField("worker-uuid", worker.uuid).Error("worker quit")
}
return nil return nil
case <-workerCheckTicker.C: case <-workerCheckTicker.C:
if worker.info.nodeInfo != nil { if worker.info.nodeInfo != nil {
nodeinfoTicker.Reset(time.Hour * 24) nodeinfoTicker.Reset(time.Hour * 24)
...@@ -295,12 +299,6 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error { ...@@ -295,12 +299,6 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
deviceUsageTicker.Reset(time.Second * time.Duration(tickerConf.DeviceUsageTicker)) deviceUsageTicker.Reset(time.Second * time.Duration(tickerConf.DeviceUsageTicker))
} }
if time.Now().Unix()-wm.GetHeartBeat(worker.uuid) > int64(workerCheckDuration.Seconds()) {
log.WithField("worker-uuid", worker.uuid).Error("worker heartbeat expired")
close(worker.quit)
return ErrHeartBeatExpired
}
if worker.registed && worker.addFirstSucceed == false && len(worker.deviceInfoHash) > 0 { if worker.registed && worker.addFirstSucceed == false && len(worker.deviceInfoHash) > 0 {
wm.AddWorkerToQueue(worker) wm.AddWorkerToQueue(worker)
} }
...@@ -422,16 +420,29 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -422,16 +420,29 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
l := log.WithField("worker-uuid", worker.uuid) l := log.WithField("worker-uuid", worker.uuid)
l.WithField("worker-addr", worker.workerAddr).Info("start handle worker message") l.WithField("worker-addr", worker.workerAddr).Info("start handle worker message")
defer l.WithField("worker-addr", worker.workerAddr).Info("exit handle worker message") defer l.WithField("worker-addr", worker.workerAddr).Info("exit handle worker message")
defer close(worker.quit)
checkDuration := config.GetConfig().Tickers.HeartBeat * 3
workerCheckTicker := time.NewTicker(time.Second * time.Duration(checkDuration))
defer workerCheckTicker.Stop()
defer close(worker.quit)
for { for {
select { select {
case <-wm.quit: case <-wm.quit:
return return
case <-worker.quit: case <-workerCheckTicker.C:
if time.Now().Unix()-wm.GetHeartBeat(worker.uuid) > int64(checkDuration) {
log.WithField("worker-uuid", worker.uuid).Error("worker heartbeat expired")
worker.quit <- ErrHeartBeatExpired
return return
}
default: default:
wmsg, err := worker.stream.Recv() wmsg, err := worker.stream.Recv()
if err != nil { if err != nil {
l.WithError(err).WithField("worker-addr", worker.workerAddr).Error("recv msg failed") l.WithError(err).WithField("worker-addr", worker.workerAddr).Error("recv msg failed")
worker.quit <- "recv msg failed"
return return
} }
worker.online = true worker.online = true
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment