Commit 318b3a38 authored by vicotor's avatar vicotor

update schedule

parent 675678e9
...@@ -353,11 +353,12 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram ...@@ -353,11 +353,12 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
} }
func (n *Node) addWorkerBack(w Worker) { func (n *Node) addWorkerBack(w Worker) {
n.rdb.RPush(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(w.priority), workerId(w)) n.rdb.RPush(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(w.priority), w.workerid)
log.WithField("worker", w.workerid).Debug("add worker back to queue") log.WithField("worker", w.workerid).Debug("add worker back to queue")
} }
func (n *Node) PopWorker(ctx context.Context, rdb *redis.Client, task *odysseus.TaskContent) (Worker, error) { func (n *Node) PopWorker(ctx context.Context, rdb *redis.Client, task *odysseus.TaskContent) (Worker, error) {
var checkedWorker = make(map[string]bool)
for i := 0; i < maxPriority; i++ { for i := 0; i < maxPriority; i++ {
for { for {
...@@ -390,11 +391,15 @@ func (n *Node) PopWorker(ctx context.Context, rdb *redis.Client, task *odysseus. ...@@ -390,11 +391,15 @@ func (n *Node) PopWorker(ctx context.Context, rdb *redis.Client, task *odysseus.
} }
if !checkWorkerHasResource(rdb, worker.addr, task.TaskType) { if !checkWorkerHasResource(rdb, worker.addr, task.TaskType) {
n.addWorkerBack(worker) n.addWorkerBack(worker)
continue if checked := checkedWorker[worker.workerid]; checked {
break
} else {
checkedWorker[worker.workerid] = true
continue
}
} }
return worker, nil return worker, nil
} }
} }
return Worker{}, ErrNoWorker return Worker{}, ErrNoWorker
} }
...@@ -24,7 +24,7 @@ var ( ...@@ -24,7 +24,7 @@ var (
var ( var (
ErrNoWorker = errors.New("no worker") ErrNoWorker = errors.New("no worker")
ErrTimeout = errors.New("timeout") ErrTimeout = errors.New("schedule timeout")
ErrDispatchFailed = errors.New("dispatch to nodemanager failed") ErrDispatchFailed = errors.New("dispatch to nodemanager failed")
) )
...@@ -48,7 +48,6 @@ func checkWorkerHasResource(rdb *redis.Client, addr string, resource uint64) boo ...@@ -48,7 +48,6 @@ func checkWorkerHasResource(rdb *redis.Client, addr string, resource uint64) boo
return false return false
} }
return b.IsSet(resource) return b.IsSet(resource)
} }
func workerResourceInfoKey(addr string) string { func workerResourceInfoKey(addr string) string {
...@@ -58,9 +57,6 @@ func workerResourceInfoKey(addr string) string { ...@@ -58,9 +57,6 @@ func workerResourceInfoKey(addr string) string {
func workerStatusKey(wid string) string { func workerStatusKey(wid string) string {
return fmt.Sprintf("%s_%s", config.WORKER_STATUS_PREFIX, wid) return fmt.Sprintf("%s_%s", config.WORKER_STATUS_PREFIX, wid)
} }
func workerId(w Worker) string {
return fmt.Sprintf("%s_%d", w.addr, w.nonce)
}
func newManagerClient(endpoint string) (omanager.NodeManagerServiceClient, error) { func newManagerClient(endpoint string) (omanager.NodeManagerServiceClient, error) {
client, err := grpc.Dial(endpoint, client, err := grpc.Dial(endpoint,
...@@ -82,7 +78,8 @@ func parseWorkerId(elem string) (string, int64) { ...@@ -82,7 +78,8 @@ func parseWorkerId(elem string) (string, int64) {
strs := strings.Split(elem, split) strs := strings.Split(elem, split)
if len(strs) == 2 { if len(strs) == 2 {
addr := strs[0] addr := strs[0]
nonce, _ := strconv.ParseInt(strs[1], 10, 64) nonceds := strings.Split(strs[1], ":")
nonce, _ := strconv.ParseInt(nonceds[0], 10, 64)
return addr, nonce return addr, nonce
} }
return "", 0 return "", 0
...@@ -109,7 +106,7 @@ func (n *Node) DispatchTask(ctx context.Context, worker Worker, task *odysseus.T ...@@ -109,7 +106,7 @@ func (n *Node) DispatchTask(ctx context.Context, worker Worker, task *odysseus.T
defer func(w Worker) { defer func(w Worker) {
if shouldAddBack { if shouldAddBack {
// add worker back to redis queue. // add worker back to redis queue.
n.rdb.LPush(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(w.priority), workerId(w)) n.rdb.LPush(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(w.priority), w.workerid)
l.WithField("worker", worker.workerid).Debug("add worker back to queue") l.WithField("worker", worker.workerid).Debug("add worker back to queue")
} }
}(worker) }(worker)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment