Commit 2c0444ef authored by vicotor's avatar vicotor

update nodemanager

parent 4da93b1a
remote_host="192.168.1.112" remote_host="192.168.2.30"
local_host="192.168.1.112" local_host="192.168.2.30"
port=10001 port=10001
metrics_port = 28010 metrics_port = 28010
private_key = "E671C143A110C239B563F702E9F4017CA6B2B2912F675EED9AA4FED684EB30CC" private_key = "E671C143A110C239B563F702E9F4017CA6B2B2912F675EED9AA4FED684EB30CC"
standard_task_file = "/Users/luxq/work/wuban/nodemanager/standardtask.json" standard_task_file = "standardtask.json"
[redis] [redis]
addr="127.0.0.1:6379" addr="127.0.0.1:6379"
...@@ -11,11 +11,16 @@ password="123456" ...@@ -11,11 +11,16 @@ password="123456"
db=0 db=0
[mysql] [mysql]
host="192.168.1.211" #host="192.168.1.211"
#port=3306
#user="root"
#password="12345678"
#database="liuxuzhong"
host="127.0.0.1"
port=3306 port=3306
user="root" user="ai"
password="12345678" password="RFnnKHRar5xk7TEF"
database="liuxuzhong" database="ai"
[kafka] [kafka]
brokers="127.0.0.1:9092" brokers="127.0.0.1:9092"
...@@ -28,3 +33,4 @@ heart_beat = 10 ...@@ -28,3 +33,4 @@ heart_beat = 10
status_ticker = 10 status_ticker = 10
device_info_ticker = 120 device_info_ticker = 120
device_usage_ticker = 100 device_usage_ticker = 100
worker_task_expire_ticker = 60
...@@ -3,6 +3,7 @@ package config ...@@ -3,6 +3,7 @@ package config
import ( import (
"fmt" "fmt"
"github.com/BurntSushi/toml" "github.com/BurntSushi/toml"
"github.com/odysseus/nodemanager/utils"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"io/ioutil" "io/ioutil"
) )
...@@ -29,10 +30,11 @@ type RedisConfig struct { ...@@ -29,10 +30,11 @@ type RedisConfig struct {
} }
type TickerConfig struct { type TickerConfig struct {
HeartBeat int `json:"heart_beat" toml:"heart_beat"` HeartBeat int `json:"heart_beat" toml:"heart_beat"`
StatusTicker int `json:"status_ticker" toml:"status_ticker"` StatusTicker int `json:"status_ticker" toml:"status_ticker"`
DeviceInfoTicker int `json:"device_info_ticker" toml:"device_info_ticker"` DeviceInfoTicker int `json:"device_info_ticker" toml:"device_info_ticker"`
DeviceUsageTicker int `json:"device_usage_ticker" toml:"device_usage_ticker"` DeviceUsageTicker int `json:"device_usage_ticker" toml:"device_usage_ticker"`
WorkerTaskExpireTicker int `json:"worker_task_expire_ticker" toml:"worker_task_expire_ticker"`
} }
type Config struct { type Config struct {
...@@ -64,6 +66,13 @@ func ParseConfig(path string) (*Config, error) { ...@@ -64,6 +66,13 @@ func ParseConfig(path string) (*Config, error) {
log.Error("unmarshal config failed", "err", err) log.Error("unmarshal config failed", "err", err)
panic(err) panic(err)
} }
if _cfg.RemoteHost == "" {
ip, err := utils.PublicIp()
if err != nil {
log.Error("get public ip failed", "err", err)
}
_cfg.RemoteHost = ip
}
return _cfg, nil return _cfg, nil
} }
......
...@@ -10,4 +10,5 @@ const ( ...@@ -10,4 +10,5 @@ const (
WORKER_DEVICE_STATUS_PREFIX = "worker_device_status_" WORKER_DEVICE_STATUS_PREFIX = "worker_device_status_"
WORKER_USAGE_INFO_PREFIX = "worker_usage_info_" WORKER_USAGE_INFO_PREFIX = "worker_usage_info_"
WORKER_RESOURCE_INFO_PREFIX = "worker_resource_info_" WORKER_RESOURCE_INFO_PREFIX = "worker_resource_info_"
WORKER_LAST_TASK_TM_PREFIX = "worker_last_task_tm_"
) )
...@@ -7,7 +7,7 @@ services: ...@@ -7,7 +7,7 @@ services:
mysql: mysql:
container_name: "ai-mysql" container_name: "ai-mysql"
ports : ports :
- "33306:3306" - "3306:3306"
environment: environment:
- MYSQL_ROOT_PASSWORD=RFnnKHRar5xk7TEF - MYSQL_ROOT_PASSWORD=RFnnKHRar5xk7TEF
- MYSQL_USER=ai - MYSQL_USER=ai
......
...@@ -28,6 +28,7 @@ var ( ...@@ -28,6 +28,7 @@ var (
Succeed = errors.New("succeed") Succeed = errors.New("succeed")
ErrWorkerExist = errors.New("worker exist") ErrWorkerExist = errors.New("worker exist")
ErrHeartBeatExpired = errors.New("worker heartbeat expired") ErrHeartBeatExpired = errors.New("worker heartbeat expired")
ErrLongtimeNoTask = errors.New("worker long time no task")
ErrInvalidMessageValue = errors.New("invalid message value") ErrInvalidMessageValue = errors.New("invalid message value")
) )
...@@ -368,6 +369,9 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error { ...@@ -368,6 +369,9 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
if err == nil { if err == nil {
// add task to cache. // add task to cache.
worker.recentTask.Add(task.TaskId, task) worker.recentTask.Add(task.TaskId, task)
if e := wm.setWorkerLastTaskTime(worker, time.Now().Unix()); e != nil {
log.WithField("worker", worker.uuid).WithError(e).Error("set worker last task time failed")
}
} }
log.WithField("worker", worker.uuid).Info("dispatch task to worker") log.WithField("worker", worker.uuid).Info("dispatch task to worker")
...@@ -440,6 +444,19 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -440,6 +444,19 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
worker.quit <- ErrHeartBeatExpired worker.quit <- ErrHeartBeatExpired
return return
} }
if lastTaskTm, err := wm.getWorkerLastTaskTime(worker); err != nil {
log.WithError(err).Error("get worker last task time failed")
} else {
expire := config.GetConfig().Tickers.WorkerTaskExpireTicker
if expire <= 0 {
expire = 60 // default value
}
if time.Now().Unix()-lastTaskTm > int64(expire) {
log.WithField("worker-uuid", worker.uuid).Error("worker last task time expired")
worker.quit <- ErrLongtimeNoTask
return
}
}
default: default:
wmsg, err := worker.stream.Recv() wmsg, err := worker.stream.Recv()
if err != nil { if err != nil {
......
...@@ -93,6 +93,7 @@ func (wm *WorkerManager) AddWorkerToQueue(worker *Worker) { ...@@ -93,6 +93,7 @@ func (wm *WorkerManager) AddWorkerToQueue(worker *Worker) {
} }
} else { } else {
// else if nmlist is not empty, clear and add self to it. // else if nmlist is not empty, clear and add self to it.
worker.nonce = nonce
wm.rdb.Del(context.Background(), workerStatusKey(worker)) wm.rdb.Del(context.Background(), workerStatusKey(worker))
wm.UpdateWorkerActive(worker) wm.UpdateWorkerActive(worker)
} }
...@@ -196,6 +197,14 @@ func (wm *WorkerManager) getWorkerSets(benefit string) ([]string, error) { ...@@ -196,6 +197,14 @@ func (wm *WorkerManager) getWorkerSets(benefit string) ([]string, error) {
return list, err return list, err
} }
func (wm *WorkerManager) getWorkerLastTaskTime(worker *Worker) (int64, error) {
return wm.rdb.Get(context.Background(), workerLastTaskTmKey(worker)).Int64()
}
func (wm *WorkerManager) setWorkerLastTaskTime(worker *Worker, tm int64) error {
return wm.rdb.Set(context.Background(), workerLastTaskTmKey(worker), tm, 0).Err()
}
func (wm *WorkerManager) checkWhiteList(worker *Worker, benefit string) error { func (wm *WorkerManager) checkWhiteList(worker *Worker, benefit string) error {
wh, err := wm.node.cache.GetWhWithAddr(benefit) wh, err := wm.node.cache.GetWhWithAddr(benefit)
if err != nil { if err != nil {
...@@ -253,6 +262,10 @@ func workerStatusKey(w *Worker) string { ...@@ -253,6 +262,10 @@ func workerStatusKey(w *Worker) string {
return fmt.Sprintf("%s_%s", config.WORKER_STATUS_PREFIX, id) return fmt.Sprintf("%s_%s", config.WORKER_STATUS_PREFIX, id)
} }
func workerLastTaskTmKey(w *Worker) string {
return config.WORKER_LAST_TASK_TM_PREFIX + w.workerAddr
}
func workerId(w *Worker) string { func workerId(w *Worker) string {
return fmt.Sprintf("%s_%d", w.workerAddr, w.nonce) return fmt.Sprintf("%s_%d", w.workerAddr, w.nonce)
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment