dispatchTask.go 3 KB
package server

import (
	"errors"
	odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
	omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
	log "github.com/sirupsen/logrus"
	"sync"
	"time"
)

type dispatchTask struct {
	worker *Worker
	task   *odysseus.TaskContent
	status TaskStatus
	ack    chan interface{}
	errCh  chan error
	mux    sync.Mutex
	result chan *omanager.SubmitTaskResult
}

func newDispatchTask(w *Worker, task *odysseus.TaskContent) *dispatchTask {
	return &dispatchTask{
		worker: w,
		task:   task,
		status: TASK_CREATE,
		ack:    make(chan interface{}, 1),
		errCh:  make(chan error, 1),
		result: make(chan *omanager.SubmitTaskResult, 1),
	}
}
func (d *dispatchTask) dispatched(errCh chan error, wm *WorkerManager) {
	tm := time.NewTicker(time.Second * 5)
	defer tm.Stop()
	d.setStatus(TASK_WAIT_ACK)

	select {
	case ackMsg, ok := <-d.ack:
		if !ok {
			d.errCh <- errors.New("ack channel closed")
		} else {
			msg := ackMsg.(*omanager.SubmitTaskAck)
			log.WithFields(log.Fields{
				"canExecute":   msg.CanExecute,
				"task-id":      msg.TaskId,
				"bootup-time":  msg.BootUpTime,
				"queue-time":   msg.QueueWaitTime,
				"execute-time": msg.ExecuteTime,
			}).Debug("got ack message")
			if !msg.CanExecute {
				d.errCh <- errors.New("worker can't execute task")
			} else {
				d.setStatus(TASK_ACKED)
				d.errCh <- nil
			}
		}
	case <-tm.C:
		d.errCh <- errors.New("ack timeout")
	case err := <-errCh:
		d.errCh <- err
	}

	if d.status == TASK_ACKED {
		// wait timeout or got result
		d.finalize(wm)
	}
	return

}
func (d *dispatchTask) setAck(msg *omanager.SubmitTaskAck) {
	select {
	case d.ack <- msg:
	default:
	}
}

func (d *dispatchTask) setResult(result *omanager.SubmitTaskResult) {
	select {
	case d.result <- result:
	default:
	}
}

func (d *dispatchTask) setStatus(status TaskStatus) {
	d.mux.Lock()
	d.status = status
	d.mux.Unlock()
}

func (d *dispatchTask) finalize(wm *WorkerManager) {
	maxExec := d.task.TaskMaxExecTime
	if maxExec <= 0 {
		maxExec = 300 // set default to 5min.
	}
	l := log.WithField("task-id", d.task.TaskId)
	ticker := time.NewTicker(time.Second * time.Duration(maxExec))
	defer ticker.Stop()
	var result *omanager.SubmitTaskResult
	select {
	case <-ticker.C:
		l.WithField("worker", d.worker.workerAddr).Info("task timeout")
		d.setStatus(TASK_TIMEOUT)
		result = &omanager.SubmitTaskResult{
			TaskId:              d.task.TaskId,
			IsSuccessed:         false,
			TaskExecuteDuration: uint64(maxExec * 1000 * 1000), // 微秒
			TaskExecuteError:    ErrExecuteTimeout.Error(),
		}

	case r := <-d.result:
		l.WithField("worker", d.worker.workerAddr).Info("task finished")
		d.setStatus(TASK_FINISHED)
		result = r
	}

	task := d.task
	if task.TaskKind != odysseus.TaskKind_StandardTask && d.worker.online == true {
		_ = wm.AddWorkerSingle(d.worker)
	}

	_, err := wm.taskResult(d.worker, task, result)
	if err != nil {
		log.WithError(err).Error("task result failed")
	}

	if task.TaskKind != odysseus.TaskKind_StandardTask {
		wm.Payment(task)
	}

}