Commit d4bda8fa authored by vicotor's avatar vicotor

fix bug

parent e005f062
......@@ -28,45 +28,86 @@ func newDispatchTask(w *Worker, task *odysseus.TaskContent) *dispatchTask {
task: task,
status: TASK_CREATE,
ack: make(chan interface{}, 1),
errCh: make(chan error, 1),
errCh: make(chan error, 10),
result: make(chan *omanager.SubmitTaskResult, 1),
}
}
func (d *dispatchTask) dispatched(wm *WorkerManager) {
tm := time.NewTicker(time.Second * 5)
defer tm.Stop()
d.setStatus(TASK_WAIT_ACK)
t1 := time.Now()
ackTicker := time.NewTicker(time.Second * 5)
defer ackTicker.Stop()
select {
case ackMsg, ok := <-d.ack:
if !ok {
d.errCh <- errors.New("ack channel closed")
} else {
maxExec := d.task.TaskMaxExecTime
if maxExec <= 0 {
maxExec = 300 // set default to 5min.
}
resultTicker := time.NewTicker(time.Second * time.Duration(maxExec))
defer resultTicker.Stop()
d.setStatus(TASK_WAIT_ACK)
var (
t1 = time.Now()
l = log.WithField("task-id", d.task.TaskId)
result *omanager.SubmitTaskResult
wait = true
)
for wait {
select {
case ackMsg := <-d.ack:
msg := ackMsg.(*omanager.SubmitTaskAck)
log.WithFields(log.Fields{
l.WithFields(log.Fields{
"canExecute": msg.CanExecute,
"task-id": msg.TaskId,
"bootup-time": msg.BootUpTime,
"queue-time": msg.QueueWaitTime,
"execute-time": msg.ExecuteTime,
"ackcost": time.Since(t1).Milliseconds(),
}).Debug("got ack message")
if !msg.CanExecute {
d.errCh <- errors.New("worker can't execute task")
} else {
if msg.CanExecute {
d.setStatus(TASK_ACKED)
d.errCh <- nil
wait = true
} else {
// stop wait and return.
d.errCh <- errors.New("worker can't execute task")
wait = false
}
case <-ackTicker.C:
d.setStatus(TASK_ACK_TIMEOUT)
d.errCh <- errors.New("ack timeout")
wait = false
case <-resultTicker.C:
d.setStatus(TASK_TIMEOUT)
result = &omanager.SubmitTaskResult{
TaskId: d.task.TaskId,
IsSuccessed: false,
TaskExecuteDuration: uint64(maxExec * 1000 * 1000), // 微秒
TaskExecuteError: ErrExecuteTimeout.Error(),
}
wait = false
case r := <-d.result:
d.errCh <- nil
d.setStatus(TASK_FINISHED)
result = r
wait = false
}
case <-tm.C:
d.errCh <- errors.New("ack timeout")
}
d.resultTime = time.Now()
l.WithFields(log.Fields{
"totaltime": d.resultTime.Sub(d.create).Milliseconds(),
"status": d.status,
}).Debug("task finished")
if d.status == TASK_ACKED {
// wait timeout or got result
d.finalize(wm)
if result != nil {
_, err := wm.taskResult(d.worker, d.task, result)
if err != nil {
log.WithError(err).Error("task result failed")
}
if d.task.TaskKind != odysseus.TaskKind_StandardTask {
wm.Payment(d.task)
}
}
return
}
......@@ -89,48 +130,3 @@ func (d *dispatchTask) setStatus(status TaskStatus) {
d.status = status
d.mux.Unlock()
}
func (d *dispatchTask) finalize(wm *WorkerManager) {
maxExec := d.task.TaskMaxExecTime
if maxExec <= 0 {
maxExec = 300 // set default to 5min.
}
l := log.WithField("task-id", d.task.TaskId)
ticker := time.NewTicker(time.Second * time.Duration(maxExec))
defer ticker.Stop()
var result *omanager.SubmitTaskResult
select {
case <-ticker.C:
d.setStatus(TASK_TIMEOUT)
result = &omanager.SubmitTaskResult{
TaskId: d.task.TaskId,
IsSuccessed: false,
TaskExecuteDuration: uint64(maxExec * 1000 * 1000), // 微秒
TaskExecuteError: ErrExecuteTimeout.Error(),
}
case r := <-d.result:
d.setStatus(TASK_FINISHED)
result = r
}
d.resultTime = time.Now()
l.WithFields(log.Fields{
"totaltime": d.resultTime.Sub(d.create).Milliseconds(),
"status": d.status,
}).Debug("task finished")
task := d.task
if task.TaskKind != odysseus.TaskKind_StandardTask && d.worker.online == true {
}
_, err := wm.taskResult(d.worker, task, result)
if err != nil {
log.WithError(err).Error("task result failed")
}
if task.TaskKind != odysseus.TaskKind_StandardTask {
wm.Payment(task)
}
}
......@@ -41,6 +41,7 @@ const (
TASK_CREATE TaskStatus = iota
TASK_WAIT_ACK
TASK_ACKED
TASK_ACK_TIMEOUT
TASK_FINISHED
TASK_TIMEOUT
)
......@@ -346,12 +347,12 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
TaskParam: task.TaskParam,
}
msg.Message = taskMsg
// add task to cache.
worker.recentTask.Add(task.TaskId, dtask)
go dtask.dispatched(wm)
callback = func(err error) bool {
if err == nil {
go dtask.dispatched(wm)
// add task to cache.
worker.recentTask.Add(task.TaskId, dtask)
if task.TaskKind == odysseus.TaskKind_ComputeTask {
if e := wm.setWorkerLastTaskTime(worker, time.Now().Unix()); e != nil {
log.WithField("worker", worker.uuid).WithError(e).Error("set worker last task time failed")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment