Commit 86a56249 authored by vicotor's avatar vicotor

add dispatch timeout

parent 3ade7f32
...@@ -2,6 +2,7 @@ endpoint="127.0.0.1:10002" ...@@ -2,6 +2,7 @@ endpoint="127.0.0.1:10002"
metrics_port = 28012 metrics_port = 28012
routines = 1 routines = 1
max_nm_update_ex = 40 max_nm_update_ex = 40
dispatch_timeout = 30
[redis] [redis]
addr="127.0.0.1:6379" addr="127.0.0.1:6379"
......
...@@ -26,13 +26,14 @@ type MysqlConfig struct { ...@@ -26,13 +26,14 @@ type MysqlConfig struct {
} }
type Config struct { type Config struct {
Endpoint string `json:"endpoint" toml:"endpoint"` Endpoint string `json:"endpoint" toml:"endpoint"`
MetricPort int `json:"metrics_port" toml:"metrics_port"` MetricPort int `json:"metrics_port" toml:"metrics_port"`
Routines int `json:"routines" toml:"routines"` Routines int `json:"routines" toml:"routines"`
MaxNmUpdateEx int `json:"max_nm_update_ex" toml:"max_nm_update_ex"` MaxNmUpdateEx int `json:"max_nm_update_ex" toml:"max_nm_update_ex"`
Redis RedisConfig `json:"redis" toml:"redis"` DispatchTimeout int `json:"dispatch_timeout" toml:"dispatch_timeout"`
Kafka KafkaConfig `json:"kafka" toml:"kafka"` Redis RedisConfig `json:"redis" toml:"redis"`
DbConfig MysqlConfig `json:"mysql" toml:"mysql"` Kafka KafkaConfig `json:"kafka" toml:"kafka"`
DbConfig MysqlConfig `json:"mysql" toml:"mysql"`
} }
var _cfg *Config = nil var _cfg *Config = nil
......
...@@ -169,15 +169,14 @@ func (n *Node) Loop(idx int) { ...@@ -169,15 +169,14 @@ func (n *Node) Loop(idx int) {
receipt.TaskProfitAccount = "" receipt.TaskProfitAccount = ""
receipt.TaskWorkerAccount = "" receipt.TaskWorkerAccount = ""
switch err { switch err {
case ErrNoWorker: case ErrNoWorker, ErrTimeout:
receipt.TaskResult = err.Error() receipt.TaskResult = err.Error()
case ErrDispatchFailed: case ErrDispatchFailed:
receipt.TaskResult = err.Error() receipt.TaskResult = err.Error()
default: default:
receipt.TaskResult = "internal error" receipt.TaskResult = "internal error"
} }
utils.FireTaskReceipt(n.kafkaProducer, receipt, config.GetConfig().Kafka.ReceiptTopic) return utils.FireTaskReceipt(n.kafkaProducer, receipt, config.GetConfig().Kafka.ReceiptTopic)
return nil
} }
for { for {
...@@ -188,13 +187,14 @@ func (n *Node) Loop(idx int) { ...@@ -188,13 +187,14 @@ func (n *Node) Loop(idx int) {
return return
case t := <-taskCh: case t := <-taskCh:
go func(task *odysseus.TaskContent) { fctx, _ := context.WithTimeout(context.Background(), time.Second*time.Duration(config.GetConfig().DispatchTimeout))
go func(ctx context.Context, task *odysseus.TaskContent) {
l := log.WithField("task-id", task.TaskId) l := log.WithField("task-id", task.TaskId)
l.WithField("task", task).Info("get task") l.WithField("task", task).Info("get task")
// todo: add parameter for re-dispatch count. // todo: add parameter for re-dispatch count.
for { for {
worker, err := PopWorker(n.rdb) worker, err := PopWorker(ctx, n.rdb)
if err == ErrNoWorker { if err == ErrNoWorker || err == ErrTimeout {
result := &odysseus.TaskResponse{ result := &odysseus.TaskResponse{
TaskId: task.TaskId, TaskId: task.TaskId,
TaskUid: task.TaskUid, TaskUid: task.TaskUid,
...@@ -203,10 +203,11 @@ func (n *Node) Loop(idx int) { ...@@ -203,10 +203,11 @@ func (n *Node) Loop(idx int) {
TaskError: err.Error(), TaskError: err.Error(),
} }
l.WithError(err).Error("pop worker failed") l.WithError(err).Error("pop worker failed")
postReceipt(task, result, err) if e := postReceipt(task, result, err); e != nil {
err = postResult(task, result) l.WithError(e).Error("post task receipt failed")
if err != nil { }
l.WithError(err).Error("post task result failed") if e := postResult(task, result); e != nil {
l.WithError(e).Error("post task result failed")
} }
break break
} }
...@@ -215,7 +216,7 @@ func (n *Node) Loop(idx int) { ...@@ -215,7 +216,7 @@ func (n *Node) Loop(idx int) {
l.WithError(err).Error("pop worker failed") l.WithError(err).Error("pop worker failed")
continue continue
} }
err = n.DispatchTask(n.rdb, worker, task) err = n.DispatchTask(ctx, worker, task)
if err != nil { if err != nil {
l.WithError(err).Error("dispatch task failed") l.WithError(err).Error("dispatch task failed")
continue continue
...@@ -224,7 +225,7 @@ func (n *Node) Loop(idx int) { ...@@ -224,7 +225,7 @@ func (n *Node) Loop(idx int) {
break break
} }
} }
}(t) }(fctx, t)
} }
} }
......
...@@ -22,6 +22,7 @@ var ( ...@@ -22,6 +22,7 @@ var (
var ( var (
ErrNoWorker = errors.New("no worker") ErrNoWorker = errors.New("no worker")
ErrTimeout = errors.New("timeout")
ErrDispatchFailed = errors.New("dispatch to nodemanager failed") ErrDispatchFailed = errors.New("dispatch to nodemanager failed")
) )
...@@ -33,9 +34,14 @@ type Worker struct { ...@@ -33,9 +34,14 @@ type Worker struct {
managers []string managers []string
} }
func PopWorker(rdb *redis.Client) (Worker, error) { func PopWorker(ctx context.Context, rdb *redis.Client) (Worker, error) {
for i := 0; i < maxPriority; i++ { for i := 0; i < maxPriority; i++ {
for { for {
if ctx.Err() != nil {
return Worker{}, ErrTimeout
}
elem, err := rdb.LPop(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(i)).Result() elem, err := rdb.LPop(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(i)).Result()
if err != nil { if err != nil {
log.WithError(err).Error("lPop worker failed") log.WithError(err).Error("lPop worker failed")
...@@ -67,6 +73,9 @@ func PopWorker(rdb *redis.Client) (Worker, error) { ...@@ -67,6 +73,9 @@ func PopWorker(rdb *redis.Client) (Worker, error) {
func workerStatusKey(wid string) string { func workerStatusKey(wid string) string {
return fmt.Sprintf("%s_%s", config.WORKER_STATUS_PREFIX, wid) return fmt.Sprintf("%s_%s", config.WORKER_STATUS_PREFIX, wid)
} }
func workerId(w Worker) string {
return fmt.Sprintf("%s_%d", w.addr, w.nonce)
}
func newManagerClient(endpoint string) (omanager.NodeManagerServiceClient, error) { func newManagerClient(endpoint string) (omanager.NodeManagerServiceClient, error) {
client, err := grpc.Dial(endpoint, client, err := grpc.Dial(endpoint,
...@@ -105,12 +114,21 @@ func parseWorkerNmValue(nmValue string) (string, int64) { ...@@ -105,12 +114,21 @@ func parseWorkerNmValue(nmValue string) (string, int64) {
return "", 0 return "", 0
} }
func (n *Node) DispatchTask(rdb *redis.Client, worker Worker, task *odysseus.TaskContent) error { func (n *Node) DispatchTask(ctx context.Context, worker Worker, task *odysseus.TaskContent) error {
l := log.WithField("task-id", task.TaskId) l := log.WithField("task-id", task.TaskId)
l.WithFields(log.Fields{ l.WithFields(log.Fields{
"worker": worker.workerid, "worker": worker.workerid,
"managerList": worker.managers, "managerList": worker.managers,
}).Debug("dispatch task to worker") }).Debug("dispatch task to worker")
var shouldAddBack = false
defer func(w Worker) {
if shouldAddBack {
// add worker back to redis queue.
n.rdb.LPush(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(w.priority), workerId(w))
l.WithField("worker", worker.workerid).Debug("add worker back to queue")
}
}(worker)
for _, manager := range worker.managers { for _, manager := range worker.managers {
endpoint, updateTime := parseWorkerNmValue(manager) endpoint, updateTime := parseWorkerNmValue(manager)
if time.Now().Unix()-updateTime > int64(config.GetConfig().MaxNmUpdateEx) { if time.Now().Unix()-updateTime > int64(config.GetConfig().MaxNmUpdateEx) {
...@@ -126,7 +144,7 @@ func (n *Node) DispatchTask(rdb *redis.Client, worker Worker, task *odysseus.Tas ...@@ -126,7 +144,7 @@ func (n *Node) DispatchTask(rdb *redis.Client, worker Worker, task *odysseus.Tas
}).Error("connect to manager failed") }).Error("connect to manager failed")
continue continue
} }
_, err = client.DispatchTask(context.Background(), &omanager.DispatchTaskRequest{ _, err = client.DispatchTask(ctx, &omanager.DispatchTaskRequest{
Miner: worker.workerid, Miner: worker.workerid,
TaskData: task, TaskData: task,
}) })
...@@ -135,7 +153,10 @@ func (n *Node) DispatchTask(rdb *redis.Client, worker Worker, task *odysseus.Tas ...@@ -135,7 +153,10 @@ func (n *Node) DispatchTask(rdb *redis.Client, worker Worker, task *odysseus.Tas
"manager": endpoint, "manager": endpoint,
"error": err, "error": err,
}).Error("dispatch to manager failed") }).Error("dispatch to manager failed")
if strings.HasSuffix(err.Error(), "deadline exceeded") {
shouldAddBack = true
return ErrTimeout
}
continue continue
} }
return nil return nil
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment