Commit e24e80a2 authored by vicotor's avatar vicotor

fix bug

parent ddd03ae2
...@@ -228,8 +228,9 @@ func (n *Node) Loop(idx int) { ...@@ -228,8 +228,9 @@ func (n *Node) Loop(idx int) {
go func(ctx context.Context, task *odysseus.TaskContent) { go func(ctx context.Context, task *odysseus.TaskContent) {
l := log.WithField("task-id", task.TaskId) l := log.WithField("task-id", task.TaskId)
l.WithField("task", task).Info("get task") l.WithField("task", task).Info("get task")
checkedWorker := make(map[string]bool)
for { for {
worker, err := n.PopWorker(ctx, n.rdb, t) worker, err := n.PopWorker(ctx, n.rdb, t, checkedWorker)
if err == ErrNoWorker || err == ErrTimeout { if err == ErrNoWorker || err == ErrTimeout {
result := &odysseus.TaskResponse{ result := &odysseus.TaskResponse{
TaskId: task.TaskId, TaskId: task.TaskId,
...@@ -357,8 +358,7 @@ func (n *Node) addWorkerBack(w Worker) { ...@@ -357,8 +358,7 @@ func (n *Node) addWorkerBack(w Worker) {
log.WithField("worker", w.workerid).Debug("add worker back to queue") log.WithField("worker", w.workerid).Debug("add worker back to queue")
} }
func (n *Node) PopWorker(ctx context.Context, rdb *redis.Client, task *odysseus.TaskContent) (Worker, error) { func (n *Node) PopWorker(ctx context.Context, rdb *redis.Client, task *odysseus.TaskContent, checkedWorker map[string]bool) (Worker, error) {
var checkedWorker = make(map[string]bool)
for i := 0; i < maxPriority; i++ { for i := 0; i < maxPriority; i++ {
for { for {
...@@ -396,7 +396,7 @@ func (n *Node) PopWorker(ctx context.Context, rdb *redis.Client, task *odysseus. ...@@ -396,7 +396,7 @@ func (n *Node) PopWorker(ctx context.Context, rdb *redis.Client, task *odysseus.
priority: i, priority: i,
managers: managerList, managers: managerList,
} }
if false { if true {
if !checkWorkerHasResource(rdb, worker.addr, task.TaskType) { if !checkWorkerHasResource(rdb, worker.addr, task.TaskType) {
n.addWorkerBack(worker) n.addWorkerBack(worker)
if checked := checkedWorker[worker.workerid]; checked { if checked := checkedWorker[worker.workerid]; checked {
......
...@@ -26,6 +26,7 @@ var ( ...@@ -26,6 +26,7 @@ var (
ErrNoWorker = errors.New("no worker") ErrNoWorker = errors.New("no worker")
ErrTimeout = errors.New("schedule timeout") ErrTimeout = errors.New("schedule timeout")
ErrDispatchFailed = errors.New("dispatch to nodemanager failed") ErrDispatchFailed = errors.New("dispatch to nodemanager failed")
ErrCannotExecute = errors.New("worker can't execute task")
) )
type Worker struct { type Worker struct {
...@@ -144,6 +145,10 @@ func (n *Node) DispatchTask(ctx context.Context, worker Worker, task *odysseus.T ...@@ -144,6 +145,10 @@ func (n *Node) DispatchTask(ctx context.Context, worker Worker, task *odysseus.T
shouldAddBack = true shouldAddBack = true
return ErrTimeout return ErrTimeout
} }
if strings.HasSuffix(err.Error(), "worker can't execute task") {
shouldAddBack = true
return ErrCannotExecute
}
continue continue
} }
return nil return nil
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment