Commit 0ed6a1bd authored by vicotor's avatar vicotor

fix bug

parent 6a96b0f3
...@@ -46,7 +46,7 @@ func NewRegistryService(conf *config.Config, rdb *redis.Client, public ecdsa.Pub ...@@ -46,7 +46,7 @@ func NewRegistryService(conf *config.Config, rdb *redis.Client, public ecdsa.Pub
} }
func (s *RegistryService) Start() { func (s *RegistryService) Start() {
ticker := time.NewTicker(time.Second * 20) ticker := time.NewTicker(time.Second * 1)
defer ticker.Stop() defer ticker.Stop()
refresh := time.NewTicker(time.Second * 5) refresh := time.NewTicker(time.Second * 5)
defer refresh.Stop() defer refresh.Stop()
...@@ -58,6 +58,8 @@ func (s *RegistryService) Start() { ...@@ -58,6 +58,8 @@ func (s *RegistryService) Start() {
case <-ticker.C: case <-ticker.C:
if err := s.registry(s.rdb); err != nil { if err := s.registry(s.rdb); err != nil {
log.WithError(err).Error("registry failed") log.WithError(err).Error("registry failed")
} else {
ticker.Reset(time.Second * 10)
} }
case <-refresh.C: case <-refresh.C:
if nodes, err := s.allNodeManager(s.rdb); err != nil { if nodes, err := s.allNodeManager(s.rdb); err != nil {
...@@ -113,7 +115,7 @@ func (s *RegistryService) GetNodeManagerList(filter ManagerFilter) []NodeManager ...@@ -113,7 +115,7 @@ func (s *RegistryService) GetNodeManagerList(filter ManagerFilter) []NodeManager
func (s *RegistryService) allNodeManager(rdb *redis.Client) ([]RegistryInfo, error) { func (s *RegistryService) allNodeManager(rdb *redis.Client) ([]RegistryInfo, error) {
var ret []RegistryInfo var ret []RegistryInfo
var tsExpired = 30 var tsExpired = 100
keys, err := rdb.Keys(context.Background(), config.NODE_MANAGER_SET+"*").Result() keys, err := rdb.Keys(context.Background(), config.NODE_MANAGER_SET+"*").Result()
if err != nil { if err != nil {
return nil, err return nil, err
......
...@@ -97,6 +97,7 @@ func (wm *WorkerManager) AddNewWorker(uuid int64, worker omanager.NodeManagerSer ...@@ -97,6 +97,7 @@ func (wm *WorkerManager) AddNewWorker(uuid int64, worker omanager.NodeManagerSer
resultCh: make(chan *omanager.SubmitTaskResult), resultCh: make(chan *omanager.SubmitTaskResult),
uuid: uuid, uuid: uuid,
stream: worker, stream: worker,
quit: make(chan interface{}),
} }
taskCache, err := lru.New(100) taskCache, err := lru.New(100)
if err != nil { if err != nil {
...@@ -134,6 +135,8 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error { ...@@ -134,6 +135,8 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
deviceUsageTicker := time.NewTicker(time.Second * 10) deviceUsageTicker := time.NewTicker(time.Second * 10)
defer deviceUsageTicker.Stop() defer deviceUsageTicker.Stop()
defer wm.InActiveWorker(worker)
for { for {
var msg = new(omanager.ManagerMessage) var msg = new(omanager.ManagerMessage)
var callback = Callback(func(err error) bool { var callback = Callback(func(err error) bool {
...@@ -146,11 +149,13 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error { ...@@ -146,11 +149,13 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
gb := new(omanager.ManagerMessage_GoodbyeMessage) gb := new(omanager.ManagerMessage_GoodbyeMessage)
gb.GoodbyeMessage = &omanager.GoodbyeMessage{} gb.GoodbyeMessage = &omanager.GoodbyeMessage{}
msg.Message = gb msg.Message = gb
case <-worker.quit:
return nil
case <-workerCheckTicker.C: case <-workerCheckTicker.C:
if time.Now().Unix()-wm.GetHeartBeat(worker.uuid) > int64(workerCheckDuration.Seconds()) { if time.Now().Unix()-wm.GetHeartBeat(worker.uuid) > int64(workerCheckDuration.Seconds()) {
wm.InActiveWorker(worker) wm.InActiveWorker(worker)
// remove worker // todo: remove worker
close(worker.quit)
return ErrHeartBeatExpired return ErrHeartBeatExpired
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment