Commit 68970e05 authored by vicotor's avatar vicotor

update workerid

parent 0b2745ef
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"github.com/odysseus/nodemanager/utils" "github.com/odysseus/nodemanager/utils"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1" omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"strconv"
"strings" "strings"
) )
...@@ -63,6 +64,14 @@ func (n *NodeManagerService) DispatchTask(ctx context.Context, request *omanager ...@@ -63,6 +64,14 @@ func (n *NodeManagerService) DispatchTask(ctx context.Context, request *omanager
if worker.online == false { if worker.online == false {
return nil, errors.New("worker offline") return nil, errors.New("worker offline")
} }
{
nonceds := strings.Split(mids[1], ":")
nonce, _ := strconv.ParseInt(nonceds[0], 10, 64)
if nonce < int64(worker.nonce) {
return nil, errors.New("expired worker nonce")
}
}
dtask := &dispatchTask{ dtask := &dispatchTask{
task: request.TaskData, task: request.TaskData,
errCh: make(chan error, 1), errCh: make(chan error, 1),
......
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/gomodule/redigo/redis" "github.com/gomodule/redigo/redis"
"github.com/google/uuid"
"github.com/odysseus/nodemanager/config" "github.com/odysseus/nodemanager/config"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"strconv" "strconv"
...@@ -67,7 +68,7 @@ func (wm *WorkerManager) AddWorkerFirst(worker *Worker) error { ...@@ -67,7 +68,7 @@ func (wm *WorkerManager) AddWorkerFirst(worker *Worker) error {
_ = device // todo: set priority with device info. _ = device // todo: set priority with device info.
for m := 0; m < config.GetConfig().GetWorkerMultiple(); m++ { for m := 0; m < config.GetConfig().GetWorkerMultiple(); m++ {
// add worker to redis queue // add worker to redis queue
if err := wm.rdb.RPush(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(priority), workerId(worker)).Err(); err != nil { if err := wm.rdb.RPush(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(priority), workerUid(worker)).Err(); err != nil {
continue continue
} }
} }
...@@ -115,7 +116,7 @@ func (wm *WorkerManager) AddWorkerSingle(worker *Worker) error { ...@@ -115,7 +116,7 @@ func (wm *WorkerManager) AddWorkerSingle(worker *Worker) error {
{ {
// add worker to redis queue // add worker to redis queue
priority := 0 priority := 0
if err := wm.rdb.RPush(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(priority), workerId(worker)).Err(); err != nil { if err := wm.rdb.RPush(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(priority), workerUid(worker)).Err(); err != nil {
log.WithError(err).Error("add worker back to queue failed.") log.WithError(err).Error("add worker back to queue failed.")
} else { } else {
log.WithField("worker", worker.workerAddr).Info("add worker back to queue success.") log.WithField("worker", worker.workerAddr).Info("add worker back to queue success.")
...@@ -269,6 +270,12 @@ func workerLastTaskTmKey(w *Worker) string { ...@@ -269,6 +270,12 @@ func workerLastTaskTmKey(w *Worker) string {
return config.WORKER_LAST_TASK_TM_PREFIX + w.workerAddr return config.WORKER_LAST_TASK_TM_PREFIX + w.workerAddr
} }
func workerUid(w *Worker) string {
id := workerId(w)
uuid := uuid.NewString()
return fmt.Sprintf("%s:%s", id, uuid[:4])
}
func workerId(w *Worker) string { func workerId(w *Worker) string {
return fmt.Sprintf("%s_%d", w.workerAddr, w.nonce) return fmt.Sprintf("%s_%d", w.workerAddr, w.nonce)
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment