Commit 006afa97 authored by vicotor's avatar vicotor

update log

parent d274b7aa
......@@ -229,7 +229,6 @@ func (n *Node) Loop(idx int) {
log.WithField("routine", idx).Info("attach kafka consumer success")
n.status = "running"
for running {
select {
case clientErr := <-client.Errors():
log.WithError(clientErr).Error("kafka consumer error")
......@@ -246,43 +245,50 @@ func (n *Node) Loop(idx int) {
go func(ctx context.Context, task *odysseus.TaskContent) {
l := log.WithField("task-id", task.TaskId)
l.WithField("task", task).Info("get task")
failed := make(map[string]bool)
t1 := time.Now()
for {
worker, err := n.poper.PopWorker(ctx, n.rdb, t, failed)
if err == types.ErrNoWorker || err == types.ErrTimeout {
var (
failed = make(map[string]bool)
t1 = time.Now()
stop = false
terr error
)
for !stop {
var worker types.Worker
worker, terr = n.poper.PopWorker(ctx, n.rdb, t, failed)
if terr == types.ErrNoWorker || terr == types.ErrTimeout {
result := &odysseus.TaskResponse{
TaskId: task.TaskId,
TaskUid: task.TaskUid,
TaskFee: task.TaskFee,
TaskIsSucceed: false,
TaskError: err.Error(),
TaskError: terr.Error(),
}
if e := n.postReceipt(task, result, err); e != nil {
l.WithError(e).Error("post task receipt failed")
if terr = n.postReceipt(task, result, terr); terr != nil {
l.WithError(terr).Error("post task receipt failed")
}
if e := n.postResult(task, result); e != nil {
l.WithError(e).Error("post task result failed")
if terr = n.postResult(task, result); terr != nil {
l.WithError(terr).Error("post task result failed")
}
l.WithField("timecost", time.Now().Sub(t1).Milliseconds()).WithError(err).Error("pop worker failed")
break
stop = true
}
if err != nil {
l.WithError(err).Error("pop worker failed")
if terr != nil {
l.WithError(terr).Error("pop worker failed")
continue
}
err = n.DispatchTask(ctx, worker, task)
if err != nil {
l.WithError(err).Error("dispatch task failed")
terr = n.DispatchTask(ctx, worker, task)
if terr != nil {
l.WithError(terr).Error("dispatch task failed")
failed[worker.Workerid] = true
continue
} else {
l.WithField("timecost", time.Now().Sub(t1).Milliseconds()).Info("dispatch task success")
break
stop = true
}
}
l.WithFields(log.Fields{
"timecost": time.Now().Sub(t1).Milliseconds(),
"err": terr,
}).Debug("scheduler task finished")
}(fctx, t)
}
}
......
......@@ -11,6 +11,7 @@ import (
log "github.com/sirupsen/logrus"
"go.mongodb.org/mongo-driver/mongo"
"math/rand"
"time"
)
type poperV2 struct {
......@@ -48,6 +49,12 @@ func (p *poperV2) PopWorker(ctx context.Context, rdb *redis.Client, task *odysse
if ctx.Err() != nil {
return types.Worker{}, types.ErrTimeout
}
t1 := time.Now()
defer func() {
log.WithFields(log.Fields{
"cost": time.Since(t1).Milliseconds(),
}).Debug("pop worker cost")
}()
workers, err := p.workerRunningOperator.FindWorkerByModelId(ctx, int(task.TaskType), 10)
if err != nil {
log.WithField("tasktype", task.TaskType).WithError(err).Error("get running model worker failed")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment