Commit 6163548d authored by vicotor's avatar vicotor

add worker queue in redis

parent 9e91cc80
package config
const (
NODE_MANAGER_SET = "node_manager_set"
NODE_MANAGER_SET = "node_manager_set"
WORKER_STATUS_PREFIX = "worker_status_"
WORKER_QUEUE_PREFIX = "worker_queue_"
)
......@@ -16,7 +16,8 @@ import (
)
var (
ErrWorkerExist = errors.New("worker exist")
ErrWorkerExist = errors.New("worker exist")
ErrHeartBeatExpired = errors.New("worker heartbeat expired")
)
type dispatchTask struct {
......@@ -111,6 +112,10 @@ func (wm *WorkerManager) AddNewWorker(uuid int64, worker omanager.NodeManagerSer
type Callback func(err error) bool
func (wm *WorkerManager) manageWorker(worker *Worker) error {
log.WithField("worker", worker.uuid).Info("start manage worker")
defer log.WithField("worker", worker.uuid).Info("exit manage worker")
heartBeatDuration := time.Second * 10
workerCheckDuration := heartBeatDuration * 3
......@@ -143,8 +148,10 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
msg.Message = gb
case <-workerCheckTicker.C:
if time.Now().Unix()-wm.GetHeartBeat(worker.uuid) > int64(workerCheckDuration.Seconds()) {
wm.InActiveWorker(worker)
// remove worker
close(worker.quit)
return ErrHeartBeatExpired
}
case <-heartBeatTicker.C:
......@@ -267,6 +274,8 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
return nil
}
func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
log.WithField("worker", worker.uuid).Info("start handle worker message")
defer log.WithField("worker", worker.uuid).Info("exit handle worker message")
for {
select {
case <-wm.quit:
......@@ -307,6 +316,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
// first time receive device info
worker.publicKey = msg.DeviceInfo.MinerPubkey
worker.deviceInfo = msg.DeviceInfo.Devices
wm.AddWorker(worker)
}
case *omanager.WorkerMessage_DeviceUsage:
......
package server
import (
"context"
"github.com/odysseus/nodemanager/config"
odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
"strconv"
)
func (wm *WorkerManager) AddWorker(worker *Worker) {
for _, device := range worker.deviceInfo {
// add device to redis
priority := 0
_ = device // todo: set priority with device info.
if err := wm.rdb.LPush(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(priority), worker.uuid).Err(); err != nil {
continue
}
}
// add worker to redis queue
wm.rdb.Set(context.Background(), config.WORKER_STATUS_PREFIX+strconv.FormatInt(worker.uuid, 10), odysseus.WorkerStatus_WorkerStatusActive, 0)
}
func (wm *WorkerManager) ActiveWorker(worker *Worker) {
wm.rdb.Set(context.Background(), config.WORKER_STATUS_PREFIX+strconv.FormatInt(worker.uuid, 10), odysseus.WorkerStatus_WorkerStatusActive, 0)
}
func (wm *WorkerManager) InActiveWorker(worker *Worker) {
wm.rdb.Set(context.Background(), config.WORKER_STATUS_PREFIX+strconv.FormatInt(worker.uuid, 10), odysseus.WorkerStatus_WorkerStatusInActive, 0)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment