Commit d2e172ac authored by luxq's avatar luxq

adjust code

parent f64083c2
package query package query
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt"
"github.com/odysseus/service-registry/common" "github.com/odysseus/service-registry/common"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
) )
...@@ -22,71 +20,23 @@ func (g GatewayInfo) Message() json.RawMessage { ...@@ -22,71 +20,23 @@ func (g GatewayInfo) Message() json.RawMessage {
return d return d
} }
type gatewayQuery struct { func (g GatewayInfo) Parse(res *redis.MapStringStringCmd) ServiceInfo {
rdb *redis.Client var info GatewayInfo
service common.ServiceType if err := res.Scan(&info); err != nil {
} return nil
func getGatewayQuery(rdb *redis.Client) ServiceQuery {
return gatewayQuery{
rdb: rdb,
service: common.SERVICE_API_GATEWAY,
}
}
func (g gatewayQuery) ModuleName() string {
return g.service.String()
}
func (g gatewayQuery) List() ([]string, error) {
all, err := getAllGw(g.rdb, g.service)
if err != nil {
return nil, err
}
var res []string
for _, v := range all {
d, _ := json.Marshal(v)
res = append(res, string(d))
}
return res, nil
}
func (g gatewayQuery) ServiceInfo(serviceid string) (string, error) {
info, err := getOneGw(g.rdb, g.service, serviceid)
if err != nil {
return "", err
}
d, err := json.Marshal(info)
if err != nil {
return "", err
} }
return info
return string(d), nil
} }
func getAllGw(rdb *redis.Client, serviceType common.ServiceType) ([]GatewayInfo, error) { type gatewayQuery struct {
keys, err := rdb.Keys(context.Background(), common.GetServiceKeyPrefix(serviceType)+"*").Result() baseService
if err != nil {
return nil, err
}
var nmInfos []GatewayInfo
for _, key := range keys {
res := rdb.HGetAll(context.Background(), key)
var info GatewayInfo
if err := res.Scan(&info); err != nil {
continue
}
nmInfos = append(nmInfos, info)
}
return nmInfos, nil
} }
func getOneGw(rdb *redis.Client, serviceType common.ServiceType, endpoint string) (GatewayInfo, error) { func getGatewayQuery(rdb *redis.Client) ServiceQuery {
k := fmt.Sprintf("%s%s", common.GetServiceKeyPrefix(serviceType), endpoint) return gatewayQuery{
res := rdb.HGetAll(context.Background(), k) baseService{
var info GatewayInfo rdb: rdb,
if err := res.Scan(&info); err != nil { service: common.SERVICE_API_GATEWAY,
return GatewayInfo{}, err },
} }
return info, nil
} }
package query package query
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt"
"github.com/odysseus/service-registry/common" "github.com/odysseus/service-registry/common"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
) )
...@@ -22,72 +20,23 @@ func (g BackendInfo) Message() json.RawMessage { ...@@ -22,72 +20,23 @@ func (g BackendInfo) Message() json.RawMessage {
return d return d
} }
type backendQuery struct { func (g BackendInfo) Parse(res *redis.MapStringStringCmd) ServiceInfo {
rdb *redis.Client var info BackendInfo
service common.ServiceType if err := res.Scan(&info); err != nil {
} return nil
func getBackendQuery(rdb *redis.Client) ServiceQuery {
return backendQuery{
rdb: rdb,
service: common.SERVICE_BACKEND,
}
}
func (g backendQuery) ModuleName() string {
return g.service.String()
}
func (g backendQuery) List() ([]string, error) {
all, err := getAllBackend(g.rdb, g.service)
if err != nil {
return nil, err
}
var res []string
for _, v := range all {
d, _ := json.Marshal(v)
res = append(res, string(d))
}
return res, nil
}
func (g backendQuery) ServiceInfo(serviceid string) (string, error) {
info, err := getOneBackend(g.rdb, g.service, serviceid)
if err != nil {
return "", err
}
d, err := json.Marshal(info)
if err != nil {
return "", err
} }
return info
return string(d), nil
} }
func getAllBackend(rdb *redis.Client, serviceType common.ServiceType) ([]BackendInfo, error) { type backendQuery struct {
keys, err := rdb.Keys(context.Background(), common.GetServiceKeyPrefix(serviceType)+"*").Result() baseService
if err != nil {
return nil, err
}
var nmInfos []BackendInfo
for _, key := range keys {
res := rdb.HGetAll(context.Background(), key)
var info BackendInfo
if err := res.Scan(&info); err != nil {
continue
}
nmInfos = append(nmInfos, info)
}
return nmInfos, nil
} }
func getOneBackend(rdb *redis.Client, serviceType common.ServiceType, endpoint string) (BackendInfo, error) { func getBackendQuery(rdb *redis.Client) ServiceQuery {
k := fmt.Sprintf("%s%s", common.GetServiceKeyPrefix(serviceType), endpoint) return backendQuery{
res := rdb.HGetAll(context.Background(), k) baseService{
var info BackendInfo rdb: rdb,
if err := res.Scan(&info); err != nil { service: common.SERVICE_BACKEND,
return BackendInfo{}, err },
} }
return info, nil
} }
package query
import (
"context"
"fmt"
"github.com/odysseus/service-registry/common"
"github.com/redis/go-redis/v9"
"sort"
)
type baseService struct {
rdb *redis.Client
service common.ServiceType
}
func (g baseService) ModuleName() string {
return g.service.String()
}
func (g baseService) List() ([]string, error) {
all, err := getAll(g.rdb, g.service)
if err != nil {
return nil, err
}
sort.Sort(ServiceInfoList(all))
var res []string
for _, v := range all {
res = append(res, string(v.Message()))
}
return res, nil
}
func (g baseService) ServiceInfo(serviceid string) (string, error) {
info, err := getOne(g.rdb, g.service, serviceid)
if err != nil {
return "", err
}
return string(info.Message()), nil
}
func parseInfo(stype common.ServiceType, res *redis.MapStringStringCmd) ServiceInfo {
var ret ServiceInfo
switch stype {
case common.SERVICE_NODE_MANAGER:
ret = NodeManagerInfo{}.Parse(res)
case common.SERVICE_API_GATEWAY:
ret = GatewayInfo{}.Parse(res)
case common.SERVICE_BACKEND:
ret = BackendInfo{}.Parse(res)
case common.SERVICE_SCHEDULER:
ret = SchedulerInfo{}.Parse(res)
case common.SERVICE_WORKER:
ret = WorkerInfo{}.Parse(res)
}
return ret
}
func getAll(rdb *redis.Client, serviceType common.ServiceType) (ServiceInfoList, error) {
keys, err := rdb.Keys(context.Background(), common.GetServiceKeyPrefix(serviceType)+"*").Result()
if err != nil {
return nil, err
}
var list []ServiceInfo
for _, key := range keys {
res := rdb.HGetAll(context.Background(), key)
if info := parseInfo(serviceType, res); info != nil {
list = append(list, info)
}
}
return list, nil
}
func getOne(rdb *redis.Client, serviceType common.ServiceType, endpoint string) (ServiceInfo, error) {
k := fmt.Sprintf("%s%s", common.GetServiceKeyPrefix(serviceType), endpoint)
res := rdb.HGetAll(context.Background(), k)
return parseInfo(serviceType, res), nil
}
package query package query
import "encoding/json" import (
"encoding/json"
"github.com/redis/go-redis/v9"
)
type ServiceQuery interface { type ServiceQuery interface {
ModuleName() string ModuleName() string
...@@ -11,6 +14,7 @@ type ServiceQuery interface { ...@@ -11,6 +14,7 @@ type ServiceQuery interface {
type ServiceInfo interface { type ServiceInfo interface {
TimeStamp() int64 TimeStamp() int64
Message() json.RawMessage Message() json.RawMessage
Parse(*redis.MapStringStringCmd) ServiceInfo
} }
type ServiceInfoList []ServiceInfo type ServiceInfoList []ServiceInfo
......
package query package query
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt"
"github.com/odysseus/service-registry/common" "github.com/odysseus/service-registry/common"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"sort"
) )
type NodeManagerInfo struct { type NodeManagerInfo struct {
...@@ -23,97 +20,23 @@ func (g NodeManagerInfo) Message() json.RawMessage { ...@@ -23,97 +20,23 @@ func (g NodeManagerInfo) Message() json.RawMessage {
return d return d
} }
type nodeManagerQuery struct { func (g NodeManagerInfo) Parse(res *redis.MapStringStringCmd) ServiceInfo {
rdb *redis.Client var info NodeManagerInfo
service common.ServiceType if err := res.Scan(&info); err != nil {
} return nil
func getNodeManagerQuery(rdb *redis.Client) ServiceQuery {
return nodeManagerQuery{
rdb: rdb,
service: common.SERVICE_NODE_MANAGER,
}
}
func (g nodeManagerQuery) ModuleName() string {
return g.service.String()
}
func (g nodeManagerQuery) List() ([]string, error) {
all, err := getAll(g.rdb, g.service)
if err != nil {
return nil, err
}
sort.Sort(ServiceInfoList(all))
var res []string
for _, v := range all {
res = append(res, string(v.Message()))
} }
return res, nil return info
} }
func (g nodeManagerQuery) ServiceInfo(serviceid string) (string, error) { type nodeManagerQuery struct {
info, err := getOne(g.rdb, g.service, serviceid) baseService
if err != nil {
return "", err
}
return string(info.Message()), nil
}
func parseInfo(stype common.ServiceType, res *redis.MapStringStringCmd) ServiceInfo {
var ret ServiceInfo
switch stype {
case common.SERVICE_NODE_MANAGER:
var info NodeManagerInfo
if err := res.Scan(&info); err != nil {
return nil
}
ret = info
case common.SERVICE_API_GATEWAY:
var info GatewayInfo
if err := res.Scan(&info); err != nil {
return nil
}
ret = info
case common.SERVICE_BACKEND:
var info BackendInfo
if err := res.Scan(&info); err != nil {
return nil
}
ret = info
case common.SERVICE_SCHEDULER:
var info SchedulerInfo
if err := res.Scan(&info); err != nil {
return nil
}
ret = info
case common.SERVICE_WORKER:
var info WorkerInfo
if err := res.Scan(&info); err != nil {
return nil
}
ret = info
}
return ret
} }
func getAll(rdb *redis.Client, serviceType common.ServiceType) (ServiceInfoList, error) { func getNodeManagerQuery(rdb *redis.Client) ServiceQuery {
keys, err := rdb.Keys(context.Background(), common.GetServiceKeyPrefix(serviceType)+"*").Result() return nodeManagerQuery{
if err != nil { baseService{
return nil, err rdb: rdb,
} service: common.SERVICE_NODE_MANAGER,
var list []ServiceInfo },
for _, key := range keys {
res := rdb.HGetAll(context.Background(), key)
if info := parseInfo(serviceType, res); info != nil {
list = append(list, info)
}
} }
return list, nil
}
func getOne(rdb *redis.Client, serviceType common.ServiceType, endpoint string) (ServiceInfo, error) {
k := fmt.Sprintf("%s%s", common.GetServiceKeyPrefix(serviceType), endpoint)
res := rdb.HGetAll(context.Background(), k)
return parseInfo(serviceType, res), nil
} }
...@@ -4,15 +4,23 @@ import ( ...@@ -4,15 +4,23 @@ import (
"github.com/odysseus/service-registry/common" "github.com/odysseus/service-registry/common"
"github.com/odysseus/service-registry/registry" "github.com/odysseus/service-registry/registry"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"sync"
) )
type ServiceQuerier struct { type ServiceQuerier struct {
rdb *redis.Client rdb *redis.Client
nmQuery ServiceQuery queriers map[common.ServiceType]ServiceQuery
schedulerQuery ServiceQuery mux sync.RWMutex
gatewayQuery ServiceQuery }
backendInfoQuery ServiceQuery
workerQuery ServiceQuery var qs = make(map[common.ServiceType]func(rdb *redis.Client) ServiceQuery)
func init() {
qs[common.SERVICE_NODE_MANAGER] = getNodeManagerQuery
qs[common.SERVICE_API_GATEWAY] = getGatewayQuery
qs[common.SERVICE_BACKEND] = getBackendQuery
qs[common.SERVICE_SCHEDULER] = getSchedulerQuery
qs[common.SERVICE_WORKER] = getWorkerQuery
} }
func NewQuery(redisParam registry.RedisConnParam) *ServiceQuerier { func NewQuery(redisParam registry.RedisConnParam) *ServiceQuerier {
...@@ -21,31 +29,19 @@ func NewQuery(redisParam registry.RedisConnParam) *ServiceQuerier { ...@@ -21,31 +29,19 @@ func NewQuery(redisParam registry.RedisConnParam) *ServiceQuerier {
Password: redisParam.Password, Password: redisParam.Password,
DB: redisParam.DbIndex, DB: redisParam.DbIndex,
}) })
queriers := make(map[common.ServiceType]ServiceQuery)
for k, v := range qs {
queriers[k] = v(rdb)
}
return &ServiceQuerier{ return &ServiceQuerier{
rdb: rdb, rdb: rdb,
nmQuery: getNodeManagerQuery(rdb), queriers: queriers,
schedulerQuery: getSchedulerQuery(rdb),
gatewayQuery: getGatewayQuery(rdb),
backendInfoQuery: getBackendQuery(rdb),
workerQuery: getWorkerQuery(rdb),
} }
} }
func (s *ServiceQuerier) Select(service common.ServiceType) ServiceQuery { func (s *ServiceQuerier) Select(service common.ServiceType) ServiceQuery {
switch service { s.mux.RLock()
case common.SERVICE_NODE_MANAGER: defer s.mux.RUnlock()
return s.nmQuery return s.queriers[service]
case common.SERVICE_SCHEDULER:
return s.schedulerQuery
case common.SERVICE_BACKEND:
return s.backendInfoQuery
case common.SERVICE_API_GATEWAY:
return s.gatewayQuery
case common.SERVICE_WORKER:
return s.workerQuery
default:
return nil
}
} }
package query package query
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt"
"github.com/odysseus/service-registry/common" "github.com/odysseus/service-registry/common"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
) )
...@@ -22,71 +20,23 @@ func (g SchedulerInfo) Message() json.RawMessage { ...@@ -22,71 +20,23 @@ func (g SchedulerInfo) Message() json.RawMessage {
return d return d
} }
type schedulerQuery struct { func (g SchedulerInfo) Parse(res *redis.MapStringStringCmd) ServiceInfo {
rdb *redis.Client var info SchedulerInfo
service common.ServiceType if err := res.Scan(&info); err != nil {
} return nil
func getSchedulerQuery(rdb *redis.Client) ServiceQuery {
return schedulerQuery{
rdb: rdb,
service: common.SERVICE_SCHEDULER,
}
}
func (g schedulerQuery) ModuleName() string {
return g.service.String()
}
func (g schedulerQuery) List() ([]string, error) {
all, err := getAllSc(g.rdb, g.service)
if err != nil {
return nil, err
}
var res []string
for _, v := range all {
d, _ := json.Marshal(v)
res = append(res, string(d))
}
return res, nil
}
func (g schedulerQuery) ServiceInfo(serviceid string) (string, error) {
info, err := getOneSc(g.rdb, g.service, serviceid)
if err != nil {
return "", err
}
d, err := json.Marshal(info)
if err != nil {
return "", err
} }
return info
return string(d), nil
} }
func getAllSc(rdb *redis.Client, serviceType common.ServiceType) ([]SchedulerInfo, error) { type schedulerQuery struct {
keys, err := rdb.Keys(context.Background(), common.GetServiceKeyPrefix(serviceType)+"*").Result() baseService
if err != nil {
return nil, err
}
var nmInfos []SchedulerInfo
for _, key := range keys {
res := rdb.HGetAll(context.Background(), key)
var info SchedulerInfo
if err := res.Scan(&info); err != nil {
continue
}
nmInfos = append(nmInfos, info)
}
return nmInfos, nil
} }
func getOneSc(rdb *redis.Client, serviceType common.ServiceType, endpoint string) (SchedulerInfo, error) { func getSchedulerQuery(rdb *redis.Client) ServiceQuery {
k := fmt.Sprintf("%s%s", common.GetServiceKeyPrefix(serviceType), endpoint) return schedulerQuery{
res := rdb.HGetAll(context.Background(), k) baseService{
var info SchedulerInfo rdb: rdb,
if err := res.Scan(&info); err != nil { service: common.SERVICE_SCHEDULER,
return SchedulerInfo{}, err },
} }
return info, nil
} }
package query package query
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt"
"github.com/odysseus/service-registry/common" "github.com/odysseus/service-registry/common"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
) )
...@@ -27,71 +25,23 @@ func (g WorkerInfo) Message() json.RawMessage { ...@@ -27,71 +25,23 @@ func (g WorkerInfo) Message() json.RawMessage {
return d return d
} }
type workerQuery struct { func (g WorkerInfo) Parse(res *redis.MapStringStringCmd) ServiceInfo {
rdb *redis.Client var info WorkerInfo
service common.ServiceType if err := res.Scan(&info); err != nil {
} return nil
func getWorkerQuery(rdb *redis.Client) ServiceQuery {
return workerQuery{
rdb: rdb,
service: common.SERVICE_WORKER,
}
}
func (g workerQuery) ModuleName() string {
return g.service.String()
}
func (g workerQuery) List() ([]string, error) {
all, err := getAllWorker(g.rdb, g.service)
if err != nil {
return nil, err
}
var res []string
for _, v := range all {
d, _ := json.Marshal(v)
res = append(res, string(d))
}
return res, nil
}
func (g workerQuery) ServiceInfo(serviceid string) (string, error) {
info, err := getOneWorker(g.rdb, g.service, serviceid)
if err != nil {
return "", err
}
d, err := json.Marshal(info)
if err != nil {
return "", err
} }
return info
return string(d), nil
} }
func getAllWorker(rdb *redis.Client, serviceType common.ServiceType) ([]WorkerInfo, error) { type workerQuery struct {
keys, err := rdb.Keys(context.Background(), common.GetServiceKeyPrefix(serviceType)+"*").Result() baseService
if err != nil {
return nil, err
}
var nmInfos []WorkerInfo
for _, key := range keys {
res := rdb.HGetAll(context.Background(), key)
var info WorkerInfo
if err := res.Scan(&info); err != nil {
continue
}
nmInfos = append(nmInfos, info)
}
return nmInfos, nil
} }
func getOneWorker(rdb *redis.Client, serviceType common.ServiceType, endpoint string) (WorkerInfo, error) { func getWorkerQuery(rdb *redis.Client) ServiceQuery {
k := fmt.Sprintf("%s%s", common.GetServiceKeyPrefix(serviceType), endpoint) return workerQuery{
res := rdb.HGetAll(context.Background(), k) baseService{
var info WorkerInfo rdb: rdb,
if err := res.Scan(&info); err != nil { service: common.SERVICE_WORKER,
return WorkerInfo{}, err },
} }
return info, nil
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment