Commit 9b5a6bb4 authored by vicotor's avatar vicotor

add nm check

parent 988f5810
endpoint="127.0.0.1:10002" endpoint="127.0.0.1:10002"
metrics_port = 28012 metrics_port = 28012
routines = 1 routines = 1
max_nm_update_ex = 40
[redis] [redis]
addr="127.0.0.1:6379" addr="127.0.0.1:6379"
......
...@@ -26,12 +26,13 @@ type MysqlConfig struct { ...@@ -26,12 +26,13 @@ type MysqlConfig struct {
} }
type Config struct { type Config struct {
Endpoint string `json:"endpoint" toml:"endpoint"` Endpoint string `json:"endpoint" toml:"endpoint"`
MetricPort int `json:"metrics_port" toml:"metrics_port"` MetricPort int `json:"metrics_port" toml:"metrics_port"`
Routines int `json:"routines" toml:"routines"` Routines int `json:"routines" toml:"routines"`
Redis RedisConfig `json:"redis" toml:"redis"` MaxNmUpdateEx int `json:"max_nm_update_ex" toml:"max_nm_update_ex"`
Kafka KafkaConfig `json:"kafka" toml:"kafka"` Redis RedisConfig `json:"redis" toml:"redis"`
DbConfig MysqlConfig `json:"mysql" toml:"mysql"` Kafka KafkaConfig `json:"kafka" toml:"kafka"`
DbConfig MysqlConfig `json:"mysql" toml:"mysql"`
} }
var _cfg *Config = nil var _cfg *Config = nil
......
package server
import (
"context"
"github.com/odysseus/scheduler/config"
"github.com/redis/go-redis/v9"
"time"
)
type NMRegistryInfo struct {
Pubkey string `redis:"pubkey"`
Timestamp int64 `redis:"timestamp"`
Endpoint string `redis:"endpoint"`
Addr string `redis:"addr"`
}
// managerUpdateTm got nm latest update time.
func managerLatestUpdateTm(rdb *redis.Client, manager string) int64 {
var invalid int64 = 1000000000
allnm, err := allNodeManager(rdb)
if err != nil {
return invalid
}
if _, ok := allnm[manager]; ok {
return allnm[manager].Timestamp - time.Now().Unix()
}
return invalid
}
func allNodeManager(rdb *redis.Client) (map[string]NMRegistryInfo, error) {
var ret = make(map[string]NMRegistryInfo)
var tsExpired = 100
keys, err := rdb.Keys(context.Background(), config.NODE_MANAGER_SET+"*").Result()
if err != nil {
return nil, err
}
for _, key := range keys {
res := rdb.HGetAll(context.Background(), key)
var info NMRegistryInfo
if err := res.Scan(&info); err != nil {
continue
}
if time.Now().Unix()-info.Timestamp > int64(tsExpired) {
// heart beat expired, ignore this nodemanager
continue
}
ret[info.Endpoint] = info
}
return ret, nil
}
...@@ -71,13 +71,18 @@ func newManagerClient(endpoint string) (omanager.NodeManagerServiceClient, error ...@@ -71,13 +71,18 @@ func newManagerClient(endpoint string) (omanager.NodeManagerServiceClient, error
return manager, nil return manager, nil
} }
func DispatchTask(worker Worker, task *odysseus.TaskContent) error { func DispatchTask(rdb *redis.Client, worker Worker, task *odysseus.TaskContent) error {
l := log.WithField("task-id", task.TaskId) l := log.WithField("task-id", task.TaskId)
l.WithFields(log.Fields{ l.WithFields(log.Fields{
"worker": worker.workerid, "worker": worker.workerid,
"managerList": worker.managers, "managerList": worker.managers,
}).Debug("dispatch task to worker") }).Debug("dispatch task to worker")
for _, manager := range worker.managers { for _, manager := range worker.managers {
if latest := managerLatestUpdateTm(rdb, manager); latest > int64(config.GetConfig().MaxNmUpdateEx) {
l.WithField("manager", manager).Warn("ignore the manager because of not update tm")
continue
}
client, err := newManagerClient(manager) client, err := newManagerClient(manager)
if err != nil { if err != nil {
l.WithFields(log.Fields{ l.WithFields(log.Fields{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment