Commit 177c8467 authored by vicotor's avatar vicotor

update for worker manager

parent c295aa4e
...@@ -152,32 +152,34 @@ func (wm *WorkerManager) LastNmValue(worker *Worker) string { ...@@ -152,32 +152,34 @@ func (wm *WorkerManager) LastNmValue(worker *Worker) string {
} }
} }
return "" return ""
} }
func (wm *WorkerManager) UpdateWorkerActive(worker *Worker) { func (wm *WorkerManager) deleteOldNmValue(worker *Worker) {
if !worker.online { nmlist, _ := wm.WorkerNmList(worker)
return for _, nmvalue := range nmlist {
} endpoint, _ := wm.parseWorkerNmValue(nmvalue)
old := worker.latestNmValue if endpoint == config.GetConfig().PublicEndpoint() {
if newNm, err := wm.activeWorker(worker); err != nil { wm.rdb.SRem(context.Background(), workerStatusKey(worker), nmvalue)
log.WithFields(log.Fields{
"worker": worker.WorkerAccount().String(),
"error": err,
}).Error("active worker failed")
return
} else {
if old != newNm {
wm.rdb.SRem(context.Background(), workerStatusKey(worker), old)
} }
} }
} }
func (wm *WorkerManager) activeWorker(worker *Worker) (string, error) { func (wm *WorkerManager) addNewNmValue(worker *Worker) error {
split := "#" split := "#"
v := fmt.Sprintf("%s%s%d", config.GetConfig().PublicEndpoint(), split, time.Now().Unix()) v := fmt.Sprintf("%s%s%d", config.GetConfig().PublicEndpoint(), split, time.Now().Unix())
worker.latestNmValue = v worker.latestNmValue = v
return v, wm.rdb.SAdd(context.Background(), workerStatusKey(worker), v).Err() return wm.rdb.SAdd(context.Background(), workerStatusKey(worker), v).Err()
}
func (wm *WorkerManager) UpdateWorkerActive(worker *Worker) {
if !worker.online {
return
}
wm.deleteOldNmValue(worker)
err := wm.addNewNmValue(worker)
if err != nil {
log.WithError(err).Error("add new nm value failed")
}
} }
func (wm *WorkerManager) parseWorkerNmValue(nmValue string) (string, int64) { func (wm *WorkerManager) parseWorkerNmValue(nmValue string) (string, int64) {
...@@ -196,18 +198,23 @@ func (wm *WorkerManager) WorkerNmList(worker *Worker) ([]string, error) { ...@@ -196,18 +198,23 @@ func (wm *WorkerManager) WorkerNmList(worker *Worker) ([]string, error) {
} }
func (wm *WorkerManager) InActiveWorker(worker *Worker) { func (wm *WorkerManager) InActiveWorker(worker *Worker) {
wm.rdb.SRem(context.Background(), workerStatusKey(worker), worker.latestNmValue) wm.deleteOldNmValue(worker)
nmlist, err := wm.WorkerNmList(worker)
if list, err := wm.rdb.SMembers(context.Background(), workerStatusKey(worker)).Result(); err == nil && len(list) == 0 { if err == nil && len(nmlist) == 0 {
wm.rdb.Del(context.Background(), workerStatusKey(worker)) wm.rdb.Del(context.Background(), workerStatusKey(worker))
if worker.info != nil { if worker.info != nil {
wm.delWorkerFromWhiteListSet(worker, worker.info.Info.BenefitAddress) wm.delWorkerFromWhiteListSet(worker, worker.info.Info.BenefitAddress)
wm.rdb.Del(context.Background(), workerLastTaskTmKey(worker))
// delete worker info from mogo. // delete worker info from mogo.
n, err := wm.workerInfoOperator.DeleteByWorkerId(context.Background(), worker.WorkerAccount().String()) n, err := wm.workerInfoOperator.DeleteByWorkerId(context.Background(), worker.WorkerAccount().String())
if err != nil { if err != nil {
log.WithError(err).Error("delete worker info failed") log.WithError(err).Error("delete worker info failed")
} }
log.Debugf("delete worker info count %d", n) log.Debugf("delete worker info count %d", n)
n, err = wm.workerRunningOperator.DeleteByWorkerId(context.Background(), worker.WorkerAccount().String())
log.Debugf("delete worker running info count %d", n)
n, err = wm.workerInstalledOperator.DeleteByWorkerId(context.Background(), worker.WorkerAccount().String())
log.Debugf("delete worker installed info count %d", n)
} }
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment