Commit 25f9171b authored by vicotor's avatar vicotor

add service registry

parent b6ff28e3
package server package server
import ( import (
"context" "encoding/json"
"github.com/odysseus/scheduler/config" "github.com/odysseus/service-registry/common"
"github.com/odysseus/service-registry/query"
"github.com/odysseus/service-registry/registry"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"time" "time"
) )
type NMRegistryInfo struct { func (n *Node) allNodeManager() map[string]registry.RegistryInfo {
Pubkey string `redis:"pubkey"` querier := query.NewQuery(registry.RedisConnParam{
Timestamp int64 `redis:"timestamp"` Addr: n.conf.Redis.Addr,
Endpoint string `redis:"endpoint"` Password: n.conf.Redis.Password,
Addr string `redis:"addr"` DbIndex: n.conf.Redis.DbIndex,
} })
result, err := querier.Select(common.SERVICE_NODE_MANAGER).List()
// managerUpdateTm got nm latest update time.
func managerLatestUpdateTm(rdb *redis.Client, manager string) int64 {
var invalid int64 = 1000000000
allnm, err := allNodeManager(rdb)
if err != nil {
return invalid
}
if _, ok := allnm[manager]; ok {
return allnm[manager].Timestamp - time.Now().Unix()
}
return invalid
}
func allNodeManager(rdb *redis.Client) (map[string]NMRegistryInfo, error) {
var ret = make(map[string]NMRegistryInfo)
var tsExpired = 100
keys, err := rdb.Keys(context.Background(), config.NODE_MANAGER_SET+"*").Result()
if err != nil { if err != nil {
return nil, err return nil
} }
for _, key := range keys { res := make(map[string]registry.RegistryInfo, 0)
res := rdb.HGetAll(context.Background(), key) for _, v := range result {
var info NMRegistryInfo var info registry.RegistryInfo
if err := res.Scan(&info); err != nil { var nminfo query.NodeManagerInfo
if err := json.Unmarshal([]byte(v), &info); err != nil {
continue continue
} }
if time.Now().Unix()-info.Timestamp > int64(tsExpired) { if err := json.Unmarshal([]byte(info.Detail), &nminfo); err != nil {
// heart beat expired, ignore this nodemanager
continue continue
} else {
res[nminfo.EndPoint] = info
} }
ret[info.Endpoint] = info
} }
return ret, nil return res
}
// managerUpdateTm got nm latest update time.
func (n *Node) managerLatestUpdateTm(rdb *redis.Client, manager_endpoint string) int64 {
var invalid int64 = 1000000000
allnm := n.allNodeManager()
if _, ok := allnm[manager_endpoint]; ok {
return allnm[manager_endpoint].Timestamp - time.Now().Unix()
}
return invalid
} }
...@@ -32,21 +32,24 @@ type Node struct { ...@@ -32,21 +32,24 @@ type Node struct {
cache *cachedata.CacheData cache *cachedata.CacheData
wg sync.WaitGroup wg sync.WaitGroup
status string status string
reg *registry.Registry
} }
func (n *Node) ServiceType() common.ServiceType { func (n *Node) ServiceType() common.ServiceType {
return common.SERVICE_SCHEDULER return common.SERVICE_SCHEDULER
} }
func (n *Node) Endpoint() string { func (n *Node) Status() string {
return n.status
}
func (n *Node) Instance() string {
hname, _ := os.Hostname() hname, _ := os.Hostname()
return fmt.Sprintf("%s:%d", hname, n.conf.Endpoint) return fmt.Sprintf("%s:%d", hname, n.conf.Endpoint)
} }
func (n *Node) DetailInfo() (json.RawMessage, error) { func (n *Node) DetailInfo() (json.RawMessage, error) {
info := query.SchedulerInfo{} info := query.SchedulerInfo{}
info.Status = n.status
return json.Marshal(info) return json.Marshal(info)
} }
...@@ -102,12 +105,13 @@ func (n *Node) startAllTask() error { ...@@ -102,12 +105,13 @@ func (n *Node) startAllTask() error {
} }
func (n *Node) register() { func (n *Node) register() {
registry := registry.NewRegistry(registry.RedisConnParam{ reg := registry.NewRegistry(registry.RedisConnParam{
Addr: n.conf.Redis.Addr, Addr: n.conf.Redis.Addr,
Password: n.conf.Redis.Password, Password: n.conf.Redis.Password,
DbIndex: n.conf.Redis.DbIndex, DbIndex: n.conf.Redis.DbIndex,
}, n) }, n)
registry.Start() n.reg = reg
reg.Start()
} }
func (n *Node) Loop(idx int) { func (n *Node) Loop(idx int) {
...@@ -205,7 +209,7 @@ func (n *Node) Loop(idx int) { ...@@ -205,7 +209,7 @@ func (n *Node) Loop(idx int) {
l.WithError(err).Error("pop worker failed") l.WithError(err).Error("pop worker failed")
continue continue
} }
err = DispatchTask(n.rdb, worker, task) err = n.DispatchTask(n.rdb, worker, task)
if err != nil { if err != nil {
l.WithError(err).Error("dispatch task failed") l.WithError(err).Error("dispatch task failed")
continue continue
......
...@@ -75,14 +75,14 @@ func newManagerClient(endpoint string) (omanager.NodeManagerServiceClient, error ...@@ -75,14 +75,14 @@ func newManagerClient(endpoint string) (omanager.NodeManagerServiceClient, error
return manager, nil return manager, nil
} }
func DispatchTask(rdb *redis.Client, worker Worker, task *odysseus.TaskContent) error { func (n *Node) DispatchTask(rdb *redis.Client, worker Worker, task *odysseus.TaskContent) error {
l := log.WithField("task-id", task.TaskId) l := log.WithField("task-id", task.TaskId)
l.WithFields(log.Fields{ l.WithFields(log.Fields{
"worker": worker.workerid, "worker": worker.workerid,
"managerList": worker.managers, "managerList": worker.managers,
}).Debug("dispatch task to worker") }).Debug("dispatch task to worker")
for _, manager := range worker.managers { for _, manager := range worker.managers {
if latest := managerLatestUpdateTm(rdb, manager); latest > int64(config.GetConfig().MaxNmUpdateEx) { if latest := n.managerLatestUpdateTm(rdb, manager); latest > int64(config.GetConfig().MaxNmUpdateEx) {
l.WithField("manager", manager).Warn("ignore the manager because of not update tm") l.WithField("manager", manager).Warn("ignore the manager because of not update tm")
continue continue
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment