Commit 9939b34c authored by luxq's avatar luxq

add registry

parent dfaa6c5b
.idea
.vscode
package common
import "fmt"
const (
NODE_MANAGER_SET = "node_manager_set"
SCHEDULER_SET = "scheduler_set"
)
func GetServiceKeyPrefix(serviceType ServiceType) string {
return fmt.Sprintf("s:%s:", ServiceTypeMap[serviceType])
}
package common
type ServiceType int
const (
SERVICE_UNKNOWN ServiceType = iota
SERVICE_API_GATEWAY = iota
SERVICE_BACKEND
SERVICE_NODE_MANAGER
SERVICE_SCHEDULER
)
var ServiceTypeMap = map[ServiceType]string{
SERVICE_UNKNOWN: "unknown",
SERVICE_API_GATEWAY: "api_gateway",
SERVICE_BACKEND: "backend",
SERVICE_NODE_MANAGER: "node_manager",
SERVICE_SCHEDULER: "scheduler",
}
func (s ServiceType) String() string {
return ServiceTypeMap[s]
}
module github.com/odysseus/service-registry
go 1.18
require (
github.com/ethereum/go-ethereum v1.13.13
github.com/redis/go-redis/v9 v9.5.1
github.com/sirupsen/logrus v1.9.3
)
require (
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/holiman/uint256 v1.2.4 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/sys v0.16.0 // indirect
)
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPxiMSCF5k=
github.com/btcsuite/btcd/btcec/v2 v2.2.0/go.mod h1:U7MHm051Al6XmscBQ0BoNydpOTsFAn707034b5nY8zU=
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK09Y2A4Xv7EE0=
github.com/decred/dcrd/crypto/blake256 v1.0.0/go.mod h1:sQl2p6Y26YV+ZOcSTP6thNdn47hh8kt6rqSlvmrXFAc=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 h1:YLtO71vCjJRCBcrPMtQ9nqBsqpA1m5sE92cU+pd5Mcc=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1/go.mod h1:hyedUtir6IdtD/7lIxGeCxkaw7y45JueMRL4DIyJDKs=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/ethereum/go-ethereum v1.13.13 h1:KYn9w7pEWRI9oyZOzO94OVbctSusPByHdFDPj634jII=
github.com/ethereum/go-ethereum v1.13.13/go.mod h1:TN8ZiHrdJwSe8Cb6x+p0hs5CxhJZPbqB7hHkaUXcmIU=
github.com/holiman/uint256 v1.2.4 h1:jUc4Nk8fm9jZabQuqr2JzednajVmBpC+oiTiXZJEApU=
github.com/holiman/uint256 v1.2.4/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXeiRV4ng7E=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8=
github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
package query
type GatewayInfo struct {
Timestamp int64 `redis:"timestamp" json:"timestamp"`
Endpoint string `redis:"endpoint" json:"endpoint"`
}
func (g GatewayInfo) TimeStamp() int64 {
return g.Timestamp
}
type gatewayQuery struct {
}
func getGatewayQuery() ServiceQuery {
return gatewayQuery{}
}
func (g gatewayQuery) ModuleName() string {
return "gateway"
}
func (g gatewayQuery) List() ([]string, error) {
return []string{}, nil
}
func (g gatewayQuery) ServiceInfo(serviceid string) (string, error) {
return "", nil
}
package query
type BackendInfo struct {
Timestamp int64 `redis:"timestamp" json:"timestamp"`
Endpoint string `redis:"endpoint" json:"endpoint"`
}
func (g BackendInfo) TimeStamp() int64 {
return g.Timestamp
}
type backendInfoQuery struct {
}
func getBackendInfoQuery() ServiceQuery {
return backendInfoQuery{}
}
func (g backendInfoQuery) ModuleName() string {
return "backend"
}
func (g backendInfoQuery) List() ([]string, error) {
return []string{}, nil
}
func (g backendInfoQuery) ServiceInfo(serviceid string) (string, error) {
return "", nil
}
package query
type ServiceQuery interface {
ModuleName() string
List() ([]string, error)
ServiceInfo(serviceid string) (string, error)
}
type TimeSortable interface {
TimeStamp() int64
}
type TimeSortableSlice []TimeSortable
func (s TimeSortableSlice) Len() int {
return len(s)
}
func (s TimeSortableSlice) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s TimeSortableSlice) Less(i, j int) bool {
return s[i].TimeStamp() < s[j].TimeStamp()
}
package query
type NodeManagerInfo struct {
Timestamp int64 `redis:"timestamp" json:"timestamp"`
Endpoint string `redis:"endpoint" json:"endpoint"`
}
func (g NodeManagerInfo) TimeStamp() int64 {
return g.Timestamp
}
type nodeManagerQuery struct {
}
func getNodeManagerQuery() ServiceQuery {
return nodeManagerQuery{}
}
func (g nodeManagerQuery) ModuleName() string {
return "nodemanager"
}
func (g nodeManagerQuery) List() ([]string, error) {
return []string{}, nil
}
func (g nodeManagerQuery) ServiceInfo(serviceid string) (string, error) {
return "", nil
}
package query
type ServiceInfo struct {
ServiceName string `json:"service_name"`
}
func init() {
}
package query
type SchedulerInfo struct {
Timestamp int64 `redis:"timestamp" json:"timestamp"`
Endpoint string `redis:"endpoint" json:"endpoint"`
}
func (g SchedulerInfo) TimeStamp() int64 {
return g.Timestamp
}
type schedulerQuery struct {
}
func getSchedulerQuery() ServiceQuery {
return schedulerQuery{}
}
func (g schedulerQuery) ModuleName() string {
return "scheduler"
}
func (g schedulerQuery) List() ([]string, error) {
return []string{}, nil
}
func (g schedulerQuery) ServiceInfo(serviceid string) (string, error) {
return "", nil
}
package query
type WorkerInfo struct {
Timestamp int64 `redis:"timestamp" json:"timestamp"`
Endpoint string `redis:"endpoint" json:"endpoint"`
}
func (g WorkerInfo) TimeStamp() int64 {
return g.Timestamp
}
type workerQuery struct {
}
func getWorkerQuery() ServiceQuery {
return workerQuery{}
}
func (g workerQuery) ModuleName() string {
return "worker"
}
func (g workerQuery) List() ([]string, error) {
return []string{}, nil
}
func (g workerQuery) ServiceInfo(serviceid string) (string, error) {
return "", nil
}
package nmregistry
import (
"context"
"encoding/json"
"fmt"
"github.com/odysseus/service-registry/common"
"github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"
"sync"
"time"
)
type RegistryInfo struct {
ServiceName string `redis:"service_name" json:"service_name"`
Timestamp int64 `redis:"timestamp" json:"timestamp"`
Endpoint string `redis:"endpoint" json:"endpoint"`
Detail json.RawMessage `redis:"detail" json:"detail"`
}
type Register interface {
ServiceType() common.ServiceType
Endpoint() string
DetailInfo() (json.RawMessage, error)
}
type Registry struct {
rdb *redis.Client
rw sync.RWMutex
register Register
quit chan struct{}
}
type RedisConnParam struct {
Addr string
Password string
DbIndex int
}
func NewRegistry(redisParam RedisConnParam, register Register) *Registry {
rdb := redis.NewClient(&redis.Options{
Addr: redisParam.Addr,
Password: redisParam.Password,
DB: redisParam.DbIndex,
})
return &Registry{
rdb: rdb,
register: register,
quit: make(chan struct{}),
}
}
func (s *Registry) Start() {
ticker := time.NewTicker(time.Second * 1)
defer ticker.Stop()
for {
select {
case <-s.quit:
return
case <-ticker.C:
if err := s.registry(s.rdb); err != nil {
log.WithError(err).Error("registry failed")
} else {
ticker.Reset(time.Second * 10)
}
}
}
}
func (s *Registry) Stop() {
close(s.quit)
}
func (s *Registry) registry(rdb *redis.Client) error {
k := fmt.Sprintf("%s%s", common.GetServiceKeyPrefix(s.register.ServiceType()), s.register.Endpoint())
detail, err := s.register.DetailInfo()
if err != nil {
log.WithError(err).Error("get detail info failed")
return err
}
err = rdb.HSet(context.Background(), k, RegistryInfo{
ServiceName: s.register.ServiceType().String(),
Timestamp: time.Now().Unix(),
Endpoint: s.register.Endpoint(),
Detail: detail,
}).Err()
if err != nil {
log.WithError(err).Error("set register info failed")
}
return err
}
package nmregistry
import (
"encoding/json"
"github.com/odysseus/service-registry/common"
"testing"
"time"
)
type demoService struct {
}
func (d demoService) ServiceType() common.ServiceType {
return common.SERVICE_NODE_MANAGER
}
func (d demoService) Endpoint() string {
return "http://127.0.0.1:10001"
}
func (d demoService) DetailInfo() (json.RawMessage, error) {
detail := struct {
IP string `json:"ip"`
WorkerCount int `json:"worker_count"`
}{}
detail.IP = ""
detail.WorkerCount = 10
return json.Marshal(detail)
}
func TestRegistry(t *testing.T) {
r := NewRegistry(RedisConnParam{
Addr: "localhost:6379",
Password: "",
DbIndex: 0,
}, demoService{})
r.Start()
time.Sleep(time.Second * 5)
r.Stop()
}
package utils
func CombineBytes(b ...[]byte) []byte {
var result []byte
for _, v := range b {
result = append(result, v...)
}
return result
}
package utils
import (
"crypto/ecdsa"
"encoding/hex"
"github.com/ethereum/go-ethereum/crypto"
"strings"
)
func HexToPrivatekey(key string) (*ecdsa.PrivateKey, error) {
return crypto.HexToECDSA(key)
}
func PrivatekeyToHex(key *ecdsa.PrivateKey) string {
return hex.EncodeToString(crypto.FromECDSA(key))
}
func PrivatekeyToAddress(key *ecdsa.PrivateKey) string {
return crypto.PubkeyToAddress(key.PublicKey).String()
}
func PubkeyToAddress(key *ecdsa.PublicKey) string {
return crypto.PubkeyToAddress(*key).String()
}
func PubkeyToHex(key *ecdsa.PublicKey) string {
pub := crypto.FromECDSAPub(key)
return hex.EncodeToString(pub)
}
func HexToPubkey(key string) (*ecdsa.PublicKey, error) {
if strings.HasPrefix(key, "0x") {
key = key[2:]
}
pub, err := hex.DecodeString(key)
if err != nil {
return nil, err
}
return crypto.UnmarshalPubkey(pub)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment