Commit 66301fb8 authored by luxq's avatar luxq

update

parent ed600923
package query
import (
"encoding/json"
"github.com/odysseus/service-registry/common"
"github.com/odysseus/service-registry/registry"
"github.com/redis/go-redis/v9"
)
type GatewayInfo struct {
Timestamp int64 `redis:"timestamp" json:"timestamp"`
Endpoint string `redis:"endpoint" json:"endpoint"`
}
func (g GatewayInfo) TimeStamp() int64 {
return g.Timestamp
}
func (g GatewayInfo) Message() json.RawMessage {
d, _ := json.Marshal(g)
return d
}
func (g GatewayInfo) Parse(res *redis.MapStringStringCmd) ServiceInfo {
var info GatewayInfo
var regInfo registry.RegistryInfo
if err := res.Scan(&regInfo); err != nil {
return nil
}
if err := json.Unmarshal([]byte(regInfo.Detail), &info); err != nil {
return nil
}
info.Timestamp = regInfo.Timestamp
info.Endpoint = regInfo.Endpoint
return info
// gateway specific info
}
type gatewayQuery struct {
......
package query
import (
"encoding/json"
"github.com/odysseus/service-registry/common"
"github.com/odysseus/service-registry/registry"
"github.com/redis/go-redis/v9"
)
type BackendInfo struct {
Timestamp int64 `redis:"timestamp" json:"timestamp"`
Endpoint string `redis:"endpoint" json:"endpoint"`
}
func (g BackendInfo) TimeStamp() int64 {
return g.Timestamp
}
func (g BackendInfo) Message() json.RawMessage {
d, _ := json.Marshal(g)
return d
}
func (g BackendInfo) Parse(res *redis.MapStringStringCmd) ServiceInfo {
var info BackendInfo
var regInfo registry.RegistryInfo
if err := res.Scan(&regInfo); err != nil {
return nil
}
if err := json.Unmarshal([]byte(regInfo.Detail), &info); err != nil {
return nil
}
info.Timestamp = regInfo.Timestamp
info.Endpoint = regInfo.Endpoint
return info
// backend specific info
}
type backendQuery struct {
......
......@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/odysseus/service-registry/common"
"github.com/odysseus/service-registry/registry"
"github.com/redis/go-redis/v9"
"sort"
)
......@@ -39,20 +40,11 @@ func (g baseService) ServiceInfo(serviceid string) (string, error) {
}
func parseInfo(stype common.ServiceType, res *redis.MapStringStringCmd) ServiceInfo {
var ret ServiceInfo
switch stype {
case common.SERVICE_NODE_MANAGER:
ret = NodeManagerInfo{}.Parse(res)
case common.SERVICE_API_GATEWAY:
ret = GatewayInfo{}.Parse(res)
case common.SERVICE_BACKEND:
ret = BackendInfo{}.Parse(res)
case common.SERVICE_SCHEDULER:
ret = SchedulerInfo{}.Parse(res)
case common.SERVICE_WORKER:
ret = WorkerInfo{}.Parse(res)
var regInfo registry.RegistryInfo
if err := res.Scan(&regInfo); err != nil {
return nil
}
return ret
return regInfo
}
func getAll(rdb *redis.Client, serviceType common.ServiceType) (ServiceInfoList, error) {
......
package query
import (
"encoding/json"
"github.com/redis/go-redis/v9"
)
import "encoding/json"
type ServiceQuery interface {
ModuleName() string
List() ([]string, error)
ServiceInfo(serviceid string) (string, error)
ServiceInfo(instance string) (string, error)
}
type ServiceInfo interface {
TimeStamp() int64
Message() json.RawMessage
Parse(*redis.MapStringStringCmd) ServiceInfo
}
type ServiceInfoList []ServiceInfo
......
package query
import (
"encoding/json"
"github.com/odysseus/service-registry/common"
"github.com/odysseus/service-registry/registry"
"github.com/redis/go-redis/v9"
)
type NodeManagerInfo struct {
Timestamp int64 `redis:"timestamp" json:"timestamp"`
Endpoint string `redis:"endpoint" json:"endpoint"`
Status string `redis:"status" json:"status"`
}
func (g NodeManagerInfo) TimeStamp() int64 {
return g.Timestamp
}
func (g NodeManagerInfo) Message() json.RawMessage {
d, _ := json.Marshal(g)
return d
}
func (g NodeManagerInfo) Parse(res *redis.MapStringStringCmd) ServiceInfo {
var info NodeManagerInfo
var regInfo registry.RegistryInfo
if err := res.Scan(&regInfo); err != nil {
return nil
}
if err := json.Unmarshal([]byte(regInfo.Detail), &info); err != nil {
return nil
}
info.Timestamp = regInfo.Timestamp
info.Endpoint = regInfo.Endpoint
return info
// node manager specific info
}
type nodeManagerQuery struct {
......
package query
import (
"encoding/json"
"github.com/odysseus/service-registry/common"
"github.com/odysseus/service-registry/registry"
"github.com/redis/go-redis/v9"
)
type SchedulerInfo struct {
Timestamp int64 `redis:"timestamp" json:"timestamp"`
Endpoint string `redis:"endpoint" json:"endpoint"`
Status string `redis:"status" json:"status"`
}
func (g SchedulerInfo) TimeStamp() int64 {
return g.Timestamp
}
func (g SchedulerInfo) Message() json.RawMessage {
d, _ := json.Marshal(g)
return d
}
func (g SchedulerInfo) Parse(res *redis.MapStringStringCmd) ServiceInfo {
var info SchedulerInfo
var regInfo registry.RegistryInfo
if err := res.Scan(&regInfo); err != nil {
return nil
}
if err := json.Unmarshal([]byte(regInfo.Detail), &info); err != nil {
return nil
}
info.Timestamp = regInfo.Timestamp
info.Endpoint = regInfo.Endpoint
return info
// scheduler specific info
}
type schedulerQuery struct {
......
package query
import (
"encoding/json"
"github.com/odysseus/service-registry/common"
"github.com/odysseus/service-registry/registry"
"github.com/redis/go-redis/v9"
)
type WorkerInfo struct {
Timestamp int64 `redis:"timestamp" json:"timestamp"`
Endpoint string `redis:"endpoint" json:"endpoint"`
// worker specific info
HearBeat int64 `redis:"heartbeat" json:"heartbeat"`
ActiveNM []string `redis:"active_nm" json:"active_nm"`
MinerAddress string `redis:"miner_address" json:"miner_address"`
BenefitAddress string `redis:"benefit_address" json:"benefit_address"`
IPs []string `redis:"ip_list" json:"ip_list"`
Status string `redis:"status" json:"status"`
}
func (g WorkerInfo) TimeStamp() int64 {
return g.Timestamp
}
func (g WorkerInfo) Message() json.RawMessage {
d, _ := json.Marshal(g)
return d
}
func (g WorkerInfo) Parse(res *redis.MapStringStringCmd) ServiceInfo {
var info WorkerInfo
var regInfo registry.RegistryInfo
if err := res.Scan(&regInfo); err != nil {
return nil
}
if err := json.Unmarshal([]byte(regInfo.Detail), &info); err != nil {
return nil
}
info.Timestamp = regInfo.Timestamp
info.Endpoint = regInfo.Endpoint
return info
IP string `redis:"ip" json:"ip"`
}
type workerQuery struct {
......
......@@ -12,15 +12,25 @@ import (
)
type RegistryInfo struct {
ServiceName string `redis:"service_name" json:"service_name"`
Timestamp int64 `redis:"timestamp" json:"timestamp"`
Endpoint string `redis:"endpoint" json:"endpoint"`
Detail string `redis:"detail" json:"detail"`
Timestamp int64 `redis:"timestamp" json:"timestamp"`
Instance string `redis:"instance" json:"instance"` // example: hostip + hostname
Status string `redis:"status" json:"status"` // example: kafka failed or running.
Detail string `redis:"detail" json:"detail"`
}
func (g RegistryInfo) TimeStamp() int64 {
return g.Timestamp
}
func (g RegistryInfo) Message() json.RawMessage {
d, _ := json.Marshal(g)
return d
}
type Register interface {
ServiceType() common.ServiceType
Endpoint() string
Instance() string
Status() string
DetailInfo() (json.RawMessage, error)
}
......@@ -38,6 +48,13 @@ type RedisConnParam struct {
}
func NewRegistry(redisParam RedisConnParam, register Register) *Registry {
switch register.ServiceType() {
case common.SERVICE_NODE_MANAGER, common.SERVICE_API_GATEWAY, common.SERVICE_BACKEND, common.SERVICE_SCHEDULER, common.SERVICE_WORKER:
//nothing
default:
log.WithField("service", register.ServiceType()).Error("not support service type")
return nil
}
rdb := redis.NewClient(&redis.Options{
Addr: redisParam.Addr,
Password: redisParam.Password,
......@@ -74,18 +91,19 @@ func (s *Registry) Stop() {
}
func (s *Registry) registry(rdb *redis.Client) error {
k := fmt.Sprintf("%s%s", common.GetServiceKeyPrefix(s.register.ServiceType()), s.register.Endpoint())
k := fmt.Sprintf("%s%s", common.GetServiceKeyPrefix(s.register.ServiceType()), s.register.Instance())
detail, err := s.register.DetailInfo()
if err != nil {
log.WithError(err).Error("get detail info failed")
return err
}
status := s.register.Status()
err = rdb.HSet(context.Background(), k, RegistryInfo{
ServiceName: s.register.ServiceType().String(),
Timestamp: time.Now().Unix(),
Endpoint: s.register.Endpoint(),
Detail: string(detail),
Timestamp: time.Now().Unix(),
Status: status,
Instance: s.register.Instance(),
Detail: string(detail),
}).Err()
if err != nil {
log.WithError(err).Error("set register info failed")
......
......@@ -3,6 +3,7 @@ package registry
import (
"encoding/json"
"github.com/odysseus/service-registry/common"
"os"
"testing"
"time"
)
......@@ -10,12 +11,17 @@ import (
type demoService struct {
}
func (d demoService) ServiceType() common.ServiceType {
return common.SERVICE_NODE_MANAGER
func (d demoService) Instance() string {
hname, _ := os.Hostname()
return "demo" + hname
}
func (d demoService) Status() string {
return "running"
}
func (d demoService) Endpoint() string {
return "http://127.0.0.1:10001"
func (d demoService) ServiceType() common.ServiceType {
return common.SERVICE_NODE_MANAGER
}
func (d demoService) DetailInfo() (json.RawMessage, error) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment