Commit c7a2755c authored by vicotor's avatar vicotor

update task nm

parent efad1e48
package server
import (
"errors"
odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
log "github.com/sirupsen/logrus"
"sync"
"time"
)
type dispatchTask struct {
worker *Worker
task *odysseus.TaskContent
status TaskStatus
ack chan interface{}
errCh chan error
mux sync.Mutex
result chan *omanager.SubmitTaskResult
}
func newDispatchTask(w *Worker, task *odysseus.TaskContent) *dispatchTask {
return &dispatchTask{
worker: w,
task: task,
status: TASK_CREATE,
ack: make(chan interface{}, 1),
errCh: make(chan error, 1),
result: make(chan *omanager.SubmitTaskResult, 1),
}
}
func (d *dispatchTask) dispatched(errCh chan error, wm *WorkerManager) {
tm := time.NewTicker(time.Second * 5)
defer tm.Stop()
d.setStatus(TASK_WAIT_ACK)
select {
case <-d.ack:
d.setStatus(TASK_ACKED)
d.errCh <- nil
case <-tm.C:
d.errCh <- errors.New("ack timeout")
case err := <-errCh:
d.errCh <- err
}
if d.status == TASK_ACKED {
// wait timeout or got result
d.finalize(wm)
}
return
}
func (d *dispatchTask) setAck() {
select {
case d.ack <- true:
default:
}
}
func (d *dispatchTask) setResult(result *omanager.SubmitTaskResult) {
select {
case d.result <- result:
default:
}
}
func (d *dispatchTask) setStatus(status TaskStatus) {
d.mux.Lock()
d.status = status
d.mux.Unlock()
}
func (d *dispatchTask) finalize(wm *WorkerManager) {
maxExec := d.task.TaskMaxExecTime
if maxExec <= 0 {
maxExec = 300 // set default to 5min.
}
l := log.WithField("task-id", d.task.TaskId)
ticker := time.NewTicker(time.Second * time.Duration(maxExec))
defer ticker.Stop()
var result *omanager.SubmitTaskResult
select {
case <-ticker.C:
l.WithField("worker", d.worker.workerAddr).Info("task timeout")
d.setStatus(TASK_TIMEOUT)
result = &omanager.SubmitTaskResult{
TaskId: d.task.TaskId,
IsSuccessed: false,
TaskExecuteDuration: uint64(maxExec * 1000 * 1000), // 微秒
TaskExecuteError: ErrExecuteTimeout.Error(),
}
case r := <-d.result:
l.WithField("worker", d.worker.workerAddr).Info("task finished")
d.setStatus(TASK_FINISHED)
result = r
}
task := d.task
if task.TaskKind != odysseus.TaskKind_StandardTask && d.worker.online == true {
_ = wm.AddWorkerSingle(d.worker)
}
_, err := wm.taskResult(d.worker, task, result)
if err != nil {
log.WithError(err).Error("task result failed")
}
if task.TaskKind != odysseus.TaskKind_StandardTask {
wm.Payment(task)
}
}
......@@ -139,6 +139,18 @@ func (n *Node) PostProof(proof *basev1.TaskProof) {
n.taskProofCh <- proof
}
func (n *Node) Start() error {
go n.registry.Start()
go n.register.Start()
go n.postLoop()
if err := n.apiStart(); err != nil {
return err
}
n.SetStatus("running")
return nil
}
func (n *Node) postLoop() {
for {
select {
......@@ -160,18 +172,6 @@ func (n *Node) postLoop() {
}
}
func (n *Node) Start() error {
go n.registry.Start()
go n.register.Start()
go n.postLoop()
if err := n.apiStart(); err != nil {
return err
}
n.SetStatus("running")
return nil
}
func (n *Node) Stop() {
n.registry.Clear()
n.registry.Stop()
......
......@@ -72,11 +72,7 @@ func (n *NodeManagerService) DispatchTask(ctx context.Context, request *omanager
}
}
dtask := &dispatchTask{
task: request.TaskData,
errCh: make(chan error, 1),
ack: make(chan interface{}, 1),
}
dtask := newDispatchTask(worker, request.TaskData)
worker.taskCh <- dtask
......
......@@ -12,8 +12,8 @@ import (
)
var (
ErrExecuteFailed = errors.New("execute failed")
ErrWorkerFailed = errors.New("worker failed")
ErrExecuteTimeout = errors.New("execute timeout")
ErrExecuteFailed = errors.New("execute failed")
)
func (wm *WorkerManager) taskResult(worker *Worker, task *odysseus.TaskContent, result *omanager.SubmitTaskResult) (*odysseus.TaskProof, error) {
......
......@@ -33,11 +33,15 @@ var (
ErrInvalidMessageValue = errors.New("invalid message value")
)
type dispatchTask struct {
task *odysseus.TaskContent
ack chan interface{}
errCh chan error
}
type TaskStatus int
const (
TASK_CREATE TaskStatus = iota
TASK_WAIT_ACK
TASK_ACKED
TASK_FINISHED
TASK_TIMEOUT
)
type workerInfo struct {
nodeInfo *omanager.NodeInfoResponse
......@@ -370,18 +374,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
msg.Message = taskMsg
errCh := make(chan error, 1)
go func() {
tm := time.NewTicker(time.Second * 5)
defer tm.Stop()
select {
case <-tm.C:
dtask.errCh <- errors.New("ack timeout")
case err := <-errCh:
dtask.errCh <- err
}
return
}()
go dtask.dispatched(errCh, wm)
callback = func(err error) bool {
if err == nil {
// add task to cache.
......@@ -397,7 +390,9 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
"worker": worker.uuid,
"worker addr": worker.workerAddr,
}).Info("dispatch task to worker")
errCh <- err
if err != nil {
errCh <- err
}
return true
}
case result := <-worker.resultCh:
......@@ -415,30 +410,17 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
continue
}
dtask := data.(*dispatchTask)
task := dtask.task
if result.TaskId != task.TaskId {
log.WithField("worker", worker.uuid).Error("task id not match")
continue
}
if task.TaskKind != odysseus.TaskKind_StandardTask {
_ = wm.AddWorkerSingle(worker)
}
proof, err := wm.taskResult(worker, task, result)
if err != nil {
log.WithError(err).Error("task result failed")
continue
}
_ = proof
worker.recentTask.Remove(result.TaskId)
if task.TaskKind != odysseus.TaskKind_StandardTask {
wm.Payment(task)
if dtask.status < TASK_FINISHED {
dtask.setResult(result)
} else {
log.WithFields(log.Fields{
"task": result.TaskId,
"worker addr": worker.workerAddr,
}).Warn("task is timeout")
}
}
if msg != nil {
if msg.Message != nil {
err := worker.stream.Send(msg)
if err != nil {
log.WithError(err).Error("send message to worker failed")
......@@ -502,11 +484,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
case *omanager.WorkerMessage_SubmitTaskAck:
if v, ok := worker.recentTask.Get(msg.SubmitTaskAck.TaskId); ok {
dtask := v.(*dispatchTask)
select {
case dtask.ack <- true:
log.WithField("task", dtask.task.TaskId).Debug("got ack")
default:
}
dtask.setAck()
}
case *omanager.WorkerMessage_SubmitTaskResult:
......@@ -610,10 +588,8 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
pushTask.TaskTimestamp = uint64(time.Now().UnixNano())
pushTask.TaskKind = odysseus.TaskKind_StandardTask
pushTask.TaskFee = "0"
worker.taskCh <- &dispatchTask{
task: &pushTask.TaskContent,
errCh: make(chan error, 1),
}
dtask := newDispatchTask(worker, &pushTask.TaskContent)
worker.taskCh <- dtask
break
} else {
l.WithField("task-type", msg.FetchStandardTask.TaskType).Warn("not found std task")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment