Commit 3ade7f32 authored by vicotor's avatar vicotor

update scheduler

parent bf072567
package server
import (
"encoding/json"
"github.com/odysseus/service-registry/common"
"github.com/odysseus/service-registry/query"
"github.com/odysseus/service-registry/registry"
"github.com/redis/go-redis/v9"
"time"
)
func (n *Node) allNodeManager() map[string]registry.RegistryInfo {
querier := query.NewQuery(registry.RedisConnParam{
Addr: n.conf.Redis.Addr,
Password: n.conf.Redis.Password,
DbIndex: n.conf.Redis.DbIndex,
})
result, err := querier.Select(common.SERVICE_NODE_MANAGER).List()
if err != nil {
return nil
}
res := make(map[string]registry.RegistryInfo, 0)
for _, v := range result {
var info registry.RegistryInfo
var nminfo query.NodeManagerInfo
if err := json.Unmarshal([]byte(v), &info); err != nil {
continue
}
if err := json.Unmarshal([]byte(info.Detail), &nminfo); err != nil {
continue
} else {
res[nminfo.Endpoint] = info
}
}
return res
}
// managerUpdateTm got nm latest update time.
func (n *Node) managerLatestUpdateTm(rdb *redis.Client, manager_endpoint string) int64 {
var invalid int64 = 1000000000
allnm := n.allNodeManager()
if _, ok := allnm[manager_endpoint]; ok {
return allnm[manager_endpoint].Timestamp - time.Now().Unix()
}
return invalid
}
......@@ -12,6 +12,8 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"strconv"
"strings"
"time"
)
var (
......@@ -25,6 +27,8 @@ var (
type Worker struct {
workerid string
addr string
nonce int64
priority int
managers []string
}
......@@ -38,6 +42,7 @@ func PopWorker(rdb *redis.Client) (Worker, error) {
break
}
log.WithField("elem", elem).Debug("lPop worker")
addr, nonce := parseWorkerId(elem)
managerList, err := rdb.SMembers(context.Background(), workerStatusKey(elem)).Result()
if err != nil {
log.WithError(err).Error("get worker status failed")
......@@ -49,6 +54,8 @@ func PopWorker(rdb *redis.Client) (Worker, error) {
}
return Worker{
workerid: elem,
addr: addr,
nonce: nonce,
priority: i,
managers: managerList,
}, nil
......@@ -75,6 +82,29 @@ func newManagerClient(endpoint string) (omanager.NodeManagerServiceClient, error
return manager, nil
}
func parseWorkerId(elem string) (string, int64) {
split := "_"
strs := strings.Split(elem, split)
if len(strs) == 2 {
addr := strs[0]
nonce, _ := strconv.ParseInt(strs[1], 10, 64)
return addr, nonce
}
return "", 0
}
func parseWorkerNmValue(nmValue string) (string, int64) {
split := "#"
strs := strings.Split(nmValue, split)
if len(strs) == 2 {
endpoint := strs[0]
timestamp, _ := strconv.ParseInt(strs[1], 10, 64)
return endpoint, timestamp
}
return "", 0
}
func (n *Node) DispatchTask(rdb *redis.Client, worker Worker, task *odysseus.TaskContent) error {
l := log.WithField("task-id", task.TaskId)
l.WithFields(log.Fields{
......@@ -82,15 +112,16 @@ func (n *Node) DispatchTask(rdb *redis.Client, worker Worker, task *odysseus.Tas
"managerList": worker.managers,
}).Debug("dispatch task to worker")
for _, manager := range worker.managers {
if latest := n.managerLatestUpdateTm(rdb, manager); latest > int64(config.GetConfig().MaxNmUpdateEx) {
endpoint, updateTime := parseWorkerNmValue(manager)
if time.Now().Unix()-updateTime > int64(config.GetConfig().MaxNmUpdateEx) {
l.WithField("manager", manager).Warn("ignore the manager because of not update tm")
continue
}
client, err := newManagerClient(manager)
client, err := newManagerClient(endpoint)
if err != nil {
l.WithFields(log.Fields{
"manager": manager,
"manager": endpoint,
"error": err,
}).Error("connect to manager failed")
continue
......@@ -101,7 +132,7 @@ func (n *Node) DispatchTask(rdb *redis.Client, worker Worker, task *odysseus.Tas
})
if err != nil {
l.WithFields(log.Fields{
"manager": manager,
"manager": endpoint,
"error": err,
}).Error("dispatch to manager failed")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment