Commit 605c1eca authored by vicotor's avatar vicotor

update ack message

parent a2607c16
......@@ -35,9 +35,25 @@ func (d *dispatchTask) dispatched(errCh chan error, wm *WorkerManager) {
d.setStatus(TASK_WAIT_ACK)
select {
case <-d.ack:
d.setStatus(TASK_ACKED)
d.errCh <- nil
case ackMsg, ok := <-d.ack:
if !ok {
d.errCh <- errors.New("ack channel closed")
} else {
msg := ackMsg.(*omanager.SubmitTaskAck)
log.WithFields(log.Fields{
"canExecute": msg.CanExecute,
"task-id": msg.TaskId,
"bootup-time": msg.BootUpTime,
"queue-time": msg.QueueWaitTime,
"execute-time": msg.ExecuteTime,
}).Debug("got ack message")
if !msg.CanExecute {
d.errCh <- errors.New("worker can't execute task")
} else {
d.setStatus(TASK_ACKED)
d.errCh <- nil
}
}
case <-tm.C:
d.errCh <- errors.New("ack timeout")
case err := <-errCh:
......@@ -51,9 +67,9 @@ func (d *dispatchTask) dispatched(errCh chan error, wm *WorkerManager) {
return
}
func (d *dispatchTask) setAck() {
func (d *dispatchTask) setAck(msg *omanager.SubmitTaskAck) {
select {
case d.ack <- true:
case d.ack <- msg:
default:
}
}
......
......@@ -451,7 +451,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
case *omanager.WorkerMessage_SubmitTaskAck:
if v, ok := worker.recentTask.Get(msg.SubmitTaskAck.TaskId); ok {
dtask := v.(*dispatchTask)
dtask.setAck()
dtask.setAck(msg.SubmitTaskAck)
}
case *omanager.WorkerMessage_SubmitTaskResult:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment