Commit a5b3407b authored by vicotor's avatar vicotor

implement node

parent 6dab9d9c
...@@ -67,7 +67,11 @@ func initConfig() { ...@@ -67,7 +67,11 @@ func initConfig() {
} }
func runNode() { func runNode() {
server.StartService() n := server.NewNode()
if err := n.Start(); err != nil {
log.WithError(err).Error("run node failed")
return
}
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(1) wg.Add(1)
......
endpoint=":10001"
metrics_port = 28010
[redis]
addr="127.0.0.1:6379"
password=""
db=0
\ No newline at end of file
...@@ -6,17 +6,21 @@ import ( ...@@ -6,17 +6,21 @@ import (
"io/ioutil" "io/ioutil"
) )
type RedisConfig struct {
Addr string `json:"addr" toml:"addr"`
Password string `json:"password" toml:"password"`
DbIndex int `json:"db_index" toml:"db_index"`
}
type Config struct { type Config struct {
Endpoint string `json:"endpoint" toml:"endpoint"` PrivateKey string `json:"private_key" toml:"private_key"`
MetricsPort int `json:"metrics_port" toml:"metrics_port"` Endpoint string `json:"endpoint" toml:"endpoint"`
MetricPort int `json:"metrics_port" toml:"metrics_port"`
Redis RedisConfig `json:"redis" toml:"redis"`
} }
var _cfg *Config = nil var _cfg *Config = nil
func (conf *Config) MetricPort() int {
return conf.MetricsPort
}
func ParseConfig(path string) (*Config, error) { func ParseConfig(path string) (*Config, error) {
data, err := ioutil.ReadFile(path) data, err := ioutil.ReadFile(path)
if err != nil { if err != nil {
...@@ -35,16 +39,3 @@ func ParseConfig(path string) (*Config, error) { ...@@ -35,16 +39,3 @@ func ParseConfig(path string) (*Config, error) {
func GetConfig() *Config { func GetConfig() *Config {
return _cfg return _cfg
} }
var (
DefaultCors = []string{"localhost"} // Default cors domain for the apis
DefaultVhosts = []string{"localhost"} // Default virtual hosts for the apis
DefaultOrigins = []string{"localhost"} // Default origins for the apis
DefaultPrefix = "" // Default prefix for the apis
DefaultModules = []string{"time"}
)
const (
APIBatchItemLimit = 2000
APIBatchResponseSizeLimit = 250 * 1000 * 1000
)
package config
const (
NODE_MANAGER_SET = "node_manager_set"
)
...@@ -4,9 +4,11 @@ go 1.18 ...@@ -4,9 +4,11 @@ go 1.18
require ( require (
github.com/BurntSushi/toml v1.3.2 github.com/BurntSushi/toml v1.3.2
github.com/ethereum/go-ethereum v1.13.10
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible
github.com/odysseus/odysseus-protocol v0.0.0-00010101000000-000000000000 github.com/odysseus/odysseus-protocol v0.0.0-00010101000000-000000000000
github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_golang v1.18.0
github.com/redis/go-redis/v9 v9.4.0
github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5 github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5
github.com/sirupsen/logrus v1.9.3 github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.8.0 github.com/spf13/cobra v1.8.0
...@@ -16,11 +18,15 @@ require ( ...@@ -16,11 +18,15 @@ require (
require ( require (
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect github.com/golang/protobuf v1.5.3 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect
github.com/holiman/uint256 v1.2.4 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jonboulle/clockwork v0.4.0 // indirect github.com/jonboulle/clockwork v0.4.0 // indirect
github.com/lestrrat-go/strftime v1.0.6 // indirect github.com/lestrrat-go/strftime v1.0.6 // indirect
...@@ -41,7 +47,8 @@ require ( ...@@ -41,7 +47,8 @@ require (
github.com/subosito/gotenv v1.6.0 // indirect github.com/subosito/gotenv v1.6.0 // indirect
go.uber.org/atomic v1.9.0 // indirect go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect go.uber.org/multierr v1.9.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect golang.org/x/crypto v0.17.0 // indirect
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect
golang.org/x/net v0.19.0 // indirect golang.org/x/net v0.19.0 // indirect
golang.org/x/sys v0.15.0 // indirect golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect golang.org/x/text v0.14.0 // indirect
......
...@@ -2,12 +2,25 @@ github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8 ...@@ -2,12 +2,25 @@ github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPxiMSCF5k=
github.com/btcsuite/btcd/btcec/v2 v2.2.0/go.mod h1:U7MHm051Al6XmscBQ0BoNydpOTsFAn707034b5nY8zU=
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK09Y2A4Xv7EE0=
github.com/decred/dcrd/crypto/blake256 v1.0.0/go.mod h1:sQl2p6Y26YV+ZOcSTP6thNdn47hh8kt6rqSlvmrXFAc=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 h1:YLtO71vCjJRCBcrPMtQ9nqBsqpA1m5sE92cU+pd5Mcc=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1/go.mod h1:hyedUtir6IdtD/7lIxGeCxkaw7y45JueMRL4DIyJDKs=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/ethereum/go-ethereum v1.13.10 h1:Ppdil79nN+Vc+mXfge0AuUgmKWuVv4eMqzoIVSdqZek=
github.com/ethereum/go-ethereum v1.13.10/go.mod h1:sc48XYQxCzH3fG9BcrXCOOgQk2JfZzNAmIKnceogzsA=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
...@@ -20,6 +33,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ ...@@ -20,6 +33,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/holiman/uint256 v1.2.4 h1:jUc4Nk8fm9jZabQuqr2JzednajVmBpC+oiTiXZJEApU=
github.com/holiman/uint256 v1.2.4/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXeiRV4ng7E=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4= github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4=
...@@ -54,6 +69,8 @@ github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lne ...@@ -54,6 +69,8 @@ github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lne
github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY= github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/redis/go-redis/v9 v9.4.0 h1:Yzoz33UZw9I/mFhx4MNrB6Fk+XHO1VukNcCa1+lwyKk=
github.com/redis/go-redis/v9 v9.4.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5 h1:mZHayPoR0lNmnHyvtYjDeq0zlVHn9K/ZXoy17ylucdo= github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5 h1:mZHayPoR0lNmnHyvtYjDeq0zlVHn9K/ZXoy17ylucdo=
github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5/go.mod h1:GEXHk5HgEKCvEIIrSpFI3ozzG5xOKA2DVlEX/gGnewM= github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5/go.mod h1:GEXHk5HgEKCvEIIrSpFI3ozzG5xOKA2DVlEX/gGnewM=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
...@@ -96,8 +113,10 @@ go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTV ...@@ -96,8 +113,10 @@ go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTV
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa h1:FRnLl4eNAQl8hwxVVC17teOw8kdjVDVAiFMtgUdTSRQ=
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa/go.mod h1:zk2irFbV9DP96SEBUUAy67IdHUaZuSnrz1n472HUCLE=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
......
...@@ -91,7 +91,7 @@ func NewRegisteredMeter(subname string, name string) prometheus.Counter { ...@@ -91,7 +91,7 @@ func NewRegisteredMeter(subname string, name string) prometheus.Counter {
} }
func StartMetrics() { func StartMetrics() {
addr := fmt.Sprintf(":%d", config.GetConfig().MetricPort()) addr := fmt.Sprintf(":%d", config.GetConfig().MetricPort)
http.Handle("/metrics", promhttp.Handler()) http.Handle("/metrics", promhttp.Handler())
err := http.ListenAndServe(addr, nil) err := http.ListenAndServe(addr, nil)
if err != nil { if err != nil {
......
package nmregistry
type ManagerFilter interface {
Filter(RegistryInfo) bool
}
package nmregistry
import (
"context"
"fmt"
"github.com/odysseus/nodemanager/config"
"github.com/odysseus/nodemanager/utils"
"github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"
"sync"
"time"
)
// nmregistry implement register nodemanger to redis, and get nodemanger list from redis.
// filter some nodemanager to worker.
type RegistryInfo struct {
Pubkey string `redis:"pubkey"`
Timestamp int64 `redis:"timestamp"`
Endpoint string `redis:"endpoint"`
Addr string `redis:"addr"`
}
type NodeManagerInfo struct {
Pubkey string `json:"pubkey"`
Endpoint string `json:"endpoint"`
}
type RegistryService struct {
nodelist []RegistryInfo
rdb *redis.Client
conf *config.Config
rw sync.RWMutex
quit chan struct{}
}
func NewRegistryService(conf *config.Config, rdb *redis.Client) *RegistryService {
return &RegistryService{
rdb: rdb,
conf: conf,
quit: make(chan struct{}),
}
}
func (s *RegistryService) Start() {
ticker := time.NewTicker(time.Second * 20)
defer ticker.Stop()
refresh := time.NewTicker(time.Second * 5)
defer refresh.Stop()
for {
select {
case <-s.quit:
return
case <-ticker.C:
if err := s.registry(s.rdb); err != nil {
log.WithError(err).Error("registry failed")
}
case <-refresh.C:
if nodes, err := s.allNodeManager(s.rdb); err != nil {
log.WithError(err).Error("refresh all nodemanager failed")
} else {
s.rw.Lock()
s.nodelist = nodes
s.rw.Unlock()
refresh.Reset(time.Second * 60)
}
}
}
}
func (s *RegistryService) Stop() {
close(s.quit)
}
func (s *RegistryService) registry(rdb *redis.Client) error {
priv, err := utils.HexToPrivatekey(s.conf.PrivateKey)
if err != nil {
panic(fmt.Sprintf("invalid private key: %s", err))
}
addr := utils.PrivatekeyToHex(priv)
pubHex := utils.PubkeyToHex(&priv.PublicKey)
err = rdb.HSet(context.Background(), config.NODE_MANAGER_SET+addr, RegistryInfo{
Pubkey: pubHex,
Timestamp: time.Now().Unix(),
Endpoint: config.GetConfig().Endpoint,
Addr: addr,
}).Err()
return err
}
func (s *RegistryService) GetNodeManagerList(filter ManagerFilter) []NodeManagerInfo {
s.rw.RLock()
defer s.rw.RUnlock()
var ret []NodeManagerInfo
for _, node := range s.nodelist {
if filter == nil || filter.Filter(node) {
ret = append(ret, NodeManagerInfo{
Pubkey: node.Pubkey,
Endpoint: node.Endpoint,
})
}
}
return ret
}
func (s *RegistryService) allNodeManager(rdb *redis.Client) ([]RegistryInfo, error) {
var ret []RegistryInfo
var tsExpired = 30
keys, err := rdb.Keys(context.Background(), config.NODE_MANAGER_SET+"*").Result()
if err != nil {
return nil, err
}
for _, key := range keys {
res := rdb.HGetAll(context.Background(), key)
var info RegistryInfo
if err := res.Scan(&info); err != nil {
continue
}
if time.Now().Unix()-info.Timestamp > int64(tsExpired) {
// heart beat expired, ignore this nodemanager
continue
}
ret = append(ret, info)
}
return ret, nil
}
package server
import (
"github.com/odysseus/nodemanager/config"
"github.com/odysseus/nodemanager/nmregistry"
"github.com/odysseus/nodemanager/utils"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"net"
)
type Node struct {
registry *nmregistry.RegistryService
apiServer *grpc.Server
rdb *redis.Client
}
func NewNode() *Node {
rdb := utils.NewRedisClient(utils.RedisConnParam{
Addr: "",
Password: "",
DbIndex: 0,
})
node := &Node{
rdb: rdb,
apiServer: grpc.NewServer(grpc.MaxSendMsgSize(1024*1024*20), grpc.MaxRecvMsgSize(1024*1024*20)),
registry: nmregistry.NewRegistryService(config.GetConfig(), rdb),
}
return node
}
func (n *Node) Start() error {
n.registry.Start()
if err := n.apiStart(); err != nil {
return err
}
return nil
}
func (n *Node) apiStart() error {
lis, err := net.Listen("tcp", config.GetConfig().Endpoint)
if err != nil {
log.WithError(err).Error("failed to listen endpoint")
return err
}
omanager.RegisterNodeManagerServiceServer(n.apiServer, &NodeManagerService{
quit: make(chan struct{}),
node: n,
})
err = n.apiServer.Serve(lis)
if err != nil {
log.WithError(err).Error("failed to serve apiserver")
return err
}
return nil
}
func (n *Node) Stop() {
n.registry.Stop()
n.apiServer.Stop()
}
package server package server
import ( import (
"context"
"errors" "errors"
"fmt"
"github.com/odysseus/nodemanager/config"
"github.com/odysseus/nodemanager/server/apibackend"
"net"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1" omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"google.golang.org/grpc"
) )
var ( var (
...@@ -17,28 +12,44 @@ var ( ...@@ -17,28 +12,44 @@ var (
) )
type NodeManagerService struct { type NodeManagerService struct {
backend apibackend.Backend node *Node
quit chan struct{} quit chan struct{}
omanager.UnimplementedNodeManagerServiceServer omanager.UnimplementedNodeManagerServiceServer
} }
func registerService(server *grpc.Server) { func (n *NodeManagerService) ManagerList(ctx context.Context, request *omanager.ManagerListRequest) (*omanager.ManagerListResponse, error) {
bend := apibackend.NewBackend() list := n.node.registry.GetNodeManagerList(nil)
omanager.RegisterNodeManagerServiceServer(server, &NodeManagerService{backend: bend, quit: make(chan struct{})}) res := new(omanager.ManagerListResponse)
res.Managers = make([]omanager.NodeManagerInfo, 0, len(list))
for _, v := range list {
res.Managers = append(res.Managers, omanager.NodeManagerInfo{
Publickey: v.Pubkey,
Endpoint: v.Endpoint,
})
}
return res, nil
} }
func StartService() { func (n *NodeManagerService) RegisterWorker(worker omanager.NodeManagerService_RegisterWorkerServer) error {
lis, err := net.Listen("tcp", config.GetConfig().Endpoint) //workerHandler := func() {
if err != nil { // for {
fmt.Printf("failed to listen: %v", err) // select {
return // case <-n.quit:
} // return
s := grpc.NewServer(grpc.MaxSendMsgSize(1024*1024*20), grpc.MaxRecvMsgSize(1024*1024*20)) // default:
registerService(s) // msg, err := worker.Recv()
// if err != nil {
// log.WithError(err).WithField("worker", "wwww").Error("recv msg failed")
// return
// }
// }
// }
//}
return nil
err = s.Serve(lis) }
if err != nil {
fmt.Printf("failed to serve: %v", err) func (n *NodeManagerService) DispatchTask(ctx context.Context, request *omanager.DispatchTaskRequest) (*omanager.DispatchTaskResponse, error) {
return //TODO implement me
} panic("implement me")
} }
package utils
import (
"crypto/ecdsa"
"encoding/hex"
"github.com/ethereum/go-ethereum/crypto"
)
func HexToPrivatekey(key string) (*ecdsa.PrivateKey, error) {
return crypto.HexToECDSA(key)
}
func PrivatekeyToHex(key *ecdsa.PrivateKey) string {
return hex.EncodeToString(crypto.FromECDSA(key))
}
func PrivatekeyToAddress(key *ecdsa.PrivateKey) string {
return crypto.PubkeyToAddress(key.PublicKey).String()
}
func PubkeyToAddress(key *ecdsa.PublicKey) string {
return crypto.PubkeyToAddress(*key).String()
}
func PubkeyToHex(key *ecdsa.PublicKey) string {
pub := crypto.FromECDSAPub(key)
return hex.EncodeToString(pub)
}
func HexToPubkey(key string) (*ecdsa.PublicKey, error) {
pub, err := hex.DecodeString(key)
if err != nil {
return nil, err
}
return crypto.UnmarshalPubkey(pub)
}
package utils
import (
"github.com/redis/go-redis/v9"
)
type RedisConnParam struct {
Addr string
Password string
DbIndex int
}
func NewRedisClient(param RedisConnParam) *redis.Client {
return redis.NewClient(&redis.Options{
Addr: param.Addr,
Password: param.Password,
DB: param.DbIndex,
})
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment