Commit 75128321 authored by vicotor's avatar vicotor

update log

parent b5edf8ad
...@@ -29,10 +29,11 @@ func newDispatchTask(w *Worker, task *odysseus.TaskContent) *dispatchTask { ...@@ -29,10 +29,11 @@ func newDispatchTask(w *Worker, task *odysseus.TaskContent) *dispatchTask {
result: make(chan *omanager.SubmitTaskResult, 1), result: make(chan *omanager.SubmitTaskResult, 1),
} }
} }
func (d *dispatchTask) dispatched(errCh chan error, wm *WorkerManager) { func (d *dispatchTask) dispatched(wm *WorkerManager) {
tm := time.NewTicker(time.Second * 5) tm := time.NewTicker(time.Second * 5)
defer tm.Stop() defer tm.Stop()
d.setStatus(TASK_WAIT_ACK) d.setStatus(TASK_WAIT_ACK)
t1 := time.Now()
select { select {
case ackMsg, ok := <-d.ack: case ackMsg, ok := <-d.ack:
...@@ -46,6 +47,7 @@ func (d *dispatchTask) dispatched(errCh chan error, wm *WorkerManager) { ...@@ -46,6 +47,7 @@ func (d *dispatchTask) dispatched(errCh chan error, wm *WorkerManager) {
"bootup-time": msg.BootUpTime, "bootup-time": msg.BootUpTime,
"queue-time": msg.QueueWaitTime, "queue-time": msg.QueueWaitTime,
"execute-time": msg.ExecuteTime, "execute-time": msg.ExecuteTime,
"ackcost": time.Since(t1).Milliseconds(),
}).Debug("got ack message") }).Debug("got ack message")
if !msg.CanExecute { if !msg.CanExecute {
d.errCh <- errors.New("worker can't execute task") d.errCh <- errors.New("worker can't execute task")
...@@ -56,8 +58,6 @@ func (d *dispatchTask) dispatched(errCh chan error, wm *WorkerManager) { ...@@ -56,8 +58,6 @@ func (d *dispatchTask) dispatched(errCh chan error, wm *WorkerManager) {
} }
case <-tm.C: case <-tm.C:
d.errCh <- errors.New("ack timeout") d.errCh <- errors.New("ack timeout")
case err := <-errCh:
d.errCh <- err
} }
if d.status == TASK_ACKED { if d.status == TASK_ACKED {
......
...@@ -348,9 +348,9 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error { ...@@ -348,9 +348,9 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
msg.Message = taskMsg msg.Message = taskMsg
errCh := make(chan error, 1) errCh := make(chan error, 1)
go dtask.dispatched(errCh, wm)
callback = func(err error) bool { callback = func(err error) bool {
if err == nil { if err == nil {
go dtask.dispatched(wm)
// add task to cache. // add task to cache.
worker.recentTask.Add(task.TaskId, dtask) worker.recentTask.Add(task.TaskId, dtask)
if task.TaskKind == odysseus.TaskKind_ComputeTask { if task.TaskKind == odysseus.TaskKind_ComputeTask {
...@@ -365,7 +365,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error { ...@@ -365,7 +365,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
"worker addr": worker.workerAddr, "worker addr": worker.workerAddr,
}).Info("dispatch task to worker") }).Info("dispatch task to worker")
if err != nil { if err != nil {
errCh <- err dtask.errCh <- err
} }
return true return true
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment