Commit 84a0c188 authored by vicotor's avatar vicotor

add task ack

parent ccfad169
...@@ -66,6 +66,7 @@ func (n *NodeManagerService) DispatchTask(ctx context.Context, request *omanager ...@@ -66,6 +66,7 @@ func (n *NodeManagerService) DispatchTask(ctx context.Context, request *omanager
dtask := &dispatchTask{ dtask := &dispatchTask{
task: request.TaskData, task: request.TaskData,
errCh: make(chan error, 1), errCh: make(chan error, 1),
ack: make(chan interface{}, 1),
} }
worker.taskCh <- dtask worker.taskCh <- dtask
......
...@@ -35,6 +35,7 @@ var ( ...@@ -35,6 +35,7 @@ var (
type dispatchTask struct { type dispatchTask struct {
task *odysseus.TaskContent task *odysseus.TaskContent
ack chan interface{}
errCh chan error errCh chan error
} }
...@@ -367,10 +368,24 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error { ...@@ -367,10 +368,24 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
TaskParam: task.TaskParam, TaskParam: task.TaskParam,
} }
msg.Message = taskMsg msg.Message = taskMsg
errCh := make(chan error, 1)
go func() {
tm := time.NewTicker(time.Second * 5)
defer tm.Stop()
select {
case <-tm.C:
dtask.errCh <- errors.New("ack timeout")
case err := <-errCh:
dtask.errCh <- err
}
return
}()
callback = func(err error) bool { callback = func(err error) bool {
if err == nil { if err == nil {
// add task to cache. // add task to cache.
worker.recentTask.Add(task.TaskId, task) worker.recentTask.Add(task.TaskId, dtask)
if task.TaskKind == odysseus.TaskKind_ComputeTask { if task.TaskKind == odysseus.TaskKind_ComputeTask {
if e := wm.setWorkerLastTaskTime(worker, time.Now().Unix()); e != nil { if e := wm.setWorkerLastTaskTime(worker, time.Now().Unix()); e != nil {
log.WithField("worker", worker.uuid).WithError(e).Error("set worker last task time failed") log.WithField("worker", worker.uuid).WithError(e).Error("set worker last task time failed")
...@@ -382,12 +397,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error { ...@@ -382,12 +397,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
"worker": worker.uuid, "worker": worker.uuid,
"worker addr": worker.workerAddr, "worker addr": worker.workerAddr,
}).Info("dispatch task to worker") }).Info("dispatch task to worker")
errCh <- err
select {
case dtask.errCh <- err:
default:
// err ch is invalid
}
return true return true
} }
case result := <-worker.resultCh: case result := <-worker.resultCh:
...@@ -488,6 +498,15 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -488,6 +498,15 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
worker.quit <- msg.GoodbyeMessage.Reason worker.quit <- msg.GoodbyeMessage.Reason
close(worker.taskCh) close(worker.taskCh)
return return
case *omanager.WorkerMessage_SubmitTaskAck:
if v, ok := worker.recentTask.Get(msg.SubmitTaskAck.TaskId); ok {
dtask := v.(dispatchTask)
select {
case dtask.ack <- true:
log.WithField("task", dtask.task.TaskId).Debug("got ack")
default:
}
}
case *omanager.WorkerMessage_SubmitTaskResult: case *omanager.WorkerMessage_SubmitTaskResult:
worker.resultCh <- msg.SubmitTaskResult worker.resultCh <- msg.SubmitTaskResult
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment