Commit 4ff397ed authored by vicotor's avatar vicotor

fix bug

parent 5951098a
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"github.com/odysseus/cache/model" "github.com/odysseus/cache/model"
odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1" odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
"github.com/odysseus/scheduler/config" "github.com/odysseus/scheduler/config"
"github.com/odysseus/scheduler/types"
"github.com/odysseus/scheduler/utils" "github.com/odysseus/scheduler/utils"
"github.com/odysseus/scheduler/workerpoper" "github.com/odysseus/scheduler/workerpoper"
"github.com/odysseus/service-registry/common" "github.com/odysseus/service-registry/common"
...@@ -183,7 +184,7 @@ func (n *Node) postReceipt(task *odysseus.TaskContent, result *odysseus.TaskResp ...@@ -183,7 +184,7 @@ func (n *Node) postReceipt(task *odysseus.TaskContent, result *odysseus.TaskResp
receipt.TaskProfitAccount = "" receipt.TaskProfitAccount = ""
receipt.TaskWorkerAccount = "" receipt.TaskWorkerAccount = ""
switch err { switch err {
case ErrNoWorker, ErrTimeout: case types.ErrNoWorker, types.ErrTimeout:
receipt.TaskResult = err.Error() receipt.TaskResult = err.Error()
case ErrDispatchFailed: case ErrDispatchFailed:
receipt.TaskResult = err.Error() receipt.TaskResult = err.Error()
...@@ -239,7 +240,7 @@ func (n *Node) Loop(idx int) { ...@@ -239,7 +240,7 @@ func (n *Node) Loop(idx int) {
failed := make(map[string]bool) failed := make(map[string]bool)
for { for {
worker, err := n.poper.PopWorker(ctx, n.rdb, t, failed) worker, err := n.poper.PopWorker(ctx, n.rdb, t, failed)
if err == workerpoper.ErrNoWorker || err == workerpoper.ErrTimeout { if err == types.ErrNoWorker || err == types.ErrTimeout {
result := &odysseus.TaskResponse{ result := &odysseus.TaskResponse{
TaskId: task.TaskId, TaskId: task.TaskId,
TaskUid: task.TaskUid, TaskUid: task.TaskUid,
......
...@@ -84,7 +84,7 @@ func (n *Node) DispatchTask(ctx context.Context, worker types.Worker, task *odys ...@@ -84,7 +84,7 @@ func (n *Node) DispatchTask(ctx context.Context, worker types.Worker, task *odys
}).Error("dispatch to manager failed") }).Error("dispatch to manager failed")
if strings.HasSuffix(err.Error(), "deadline exceeded") { if strings.HasSuffix(err.Error(), "deadline exceeded") {
shouldAddBack = true shouldAddBack = true
return ErrTimeout return types.ErrTimeout
} }
continue continue
} }
......
package types package types
import "errors"
var (
ErrNoWorker = errors.New("no worker")
ErrTimeout = errors.New("schedule timeout")
)
type Worker struct { type Worker struct {
Workerid string Workerid string
Addr string Addr string
......
...@@ -53,7 +53,7 @@ func (p *poperV1) PopWorker(ctx context.Context, rdb *redis.Client, task *odysse ...@@ -53,7 +53,7 @@ func (p *poperV1) PopWorker(ctx context.Context, rdb *redis.Client, task *odysse
for i := 0; i < maxPriority; i++ { for i := 0; i < maxPriority; i++ {
for { for {
if ctx.Err() != nil { if ctx.Err() != nil {
return types.Worker{}, ErrTimeout return types.Worker{}, types.ErrTimeout
} }
elem, err := rdb.LPop(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(i)).Result() elem, err := rdb.LPop(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(i)).Result()
...@@ -100,7 +100,7 @@ func (p *poperV1) PopWorker(ctx context.Context, rdb *redis.Client, task *odysse ...@@ -100,7 +100,7 @@ func (p *poperV1) PopWorker(ctx context.Context, rdb *redis.Client, task *odysse
return worker, nil return worker, nil
} }
} }
return types.Worker{}, ErrNoWorker return types.Worker{}, types.ErrNoWorker
} }
func parseWorkerId(elem string) (string, int64) { func parseWorkerId(elem string) (string, int64) {
......
...@@ -46,7 +46,7 @@ func (p *poperV2) AddBack(w types.Worker) { ...@@ -46,7 +46,7 @@ func (p *poperV2) AddBack(w types.Worker) {
func (p *poperV2) PopWorker(ctx context.Context, rdb *redis.Client, task *odysseus.TaskContent, ex map[string]bool) (types.Worker, error) { func (p *poperV2) PopWorker(ctx context.Context, rdb *redis.Client, task *odysseus.TaskContent, ex map[string]bool) (types.Worker, error) {
// 1. select from running model worker. // 1. select from running model worker.
if ctx.Err() != nil { if ctx.Err() != nil {
return types.Worker{}, ErrTimeout return types.Worker{}, types.ErrTimeout
} }
workers, err := p.workerRunningOperator.FindWorkerByModelId(ctx, int(task.TaskType), 10) workers, err := p.workerRunningOperator.FindWorkerByModelId(ctx, int(task.TaskType), 10)
if err != nil { if err != nil {
...@@ -123,5 +123,5 @@ func (p *poperV2) PopWorker(ctx context.Context, rdb *redis.Client, task *odysse ...@@ -123,5 +123,5 @@ func (p *poperV2) PopWorker(ctx context.Context, rdb *redis.Client, task *odysse
return worker, nil return worker, nil
} }
return types.Worker{}, ErrNoWorker return types.Worker{}, types.ErrNoWorker
} }
...@@ -2,7 +2,6 @@ package workerpoper ...@@ -2,7 +2,6 @@ package workerpoper
import ( import (
"context" "context"
"errors"
odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1" odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
"github.com/odysseus/scheduler/config" "github.com/odysseus/scheduler/config"
"github.com/odysseus/scheduler/types" "github.com/odysseus/scheduler/types"
...@@ -10,11 +9,6 @@ import ( ...@@ -10,11 +9,6 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
var (
ErrNoWorker = errors.New("no worker")
ErrTimeout = errors.New("schedule timeout")
)
type WorkerPoper interface { type WorkerPoper interface {
CanAddBack() bool CanAddBack() bool
AddBack(worker types.Worker) AddBack(worker types.Worker)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment