Commit dd656801 authored by vicotor's avatar vicotor

update worker registry

parent de15185a
......@@ -11,7 +11,8 @@ WORKDIR /build
RUN git clone https://code.wuban.net.cn/odysseus/nodemanager && \
git clone https://code.wuban.net.cn/odysseus/odysseus-protocol && \
git clone https://code.wuban.net.cn/odysseus/cache
git clone https://code.wuban.net.cn/odysseus/cache && \
git clone https://code.wuban.net.cn/odysseus/service-registry
RUN cd /build/nodemanager && make && cp build/bin/manager /manager
......
......@@ -73,10 +73,6 @@ func (conf *Config) PublicEndpoint() string {
return fmt.Sprintf("%s:%d", conf.RemoteHost, conf.Port)
}
func (conf *Config) LocalEndpoint() string {
return fmt.Sprintf("%s:%d", conf.LocalHost, conf.Port)
}
func (conf *Config) ApiEndpoint() string {
return fmt.Sprintf("0.0.0.0:%d", conf.Port)
}
......
......@@ -5,4 +5,5 @@ const (
WORKER_STATUS_PREFIX = "worker_status_"
WORKER_QUEUE_PREFIX = "worker_queue_"
WORKER_DEVICE_INFO_PREFIX = "worker_device_info_"
WORKER_USAGE_INFO_PREFIX = "worker_usage_info_"
)
......@@ -6,15 +6,18 @@ require (
github.com/BurntSushi/toml v1.3.2
github.com/IBM/sarama v1.42.1
github.com/astaxie/beego v1.12.3
github.com/ethereum/go-ethereum v1.13.10
github.com/ethereum/go-ethereum v1.13.13
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.3
github.com/google/uuid v1.5.0
github.com/hashicorp/golang-lru v0.5.4
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible
github.com/odysseus/odysseus-protocol v0.0.0-00010101000000-000000000000
github.com/magiconair/properties v1.8.7
github.com/odysseus/cache v0.0.0-00010101000000-000000000000
github.com/odysseus/odysseus-protocol v0.0.0-00010101000000-000000000000
github.com/odysseus/service-registry v0.0.0-00010101000000-000000000000
github.com/prometheus/client_golang v1.18.0
github.com/redis/go-redis/v9 v9.4.0
github.com/redis/go-redis/v9 v9.5.1
github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.8.0
......@@ -37,7 +40,6 @@ require (
github.com/go-sql-driver/mysql v1.7.1 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/gomodule/redigo v2.0.0+incompatible // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
......@@ -52,7 +54,6 @@ require (
github.com/jonboulle/clockwork v0.4.0 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/lestrrat-go/strftime v1.0.6 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
......@@ -73,7 +74,7 @@ require (
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f // indirect
google.golang.org/protobuf v1.32.0 // indirect
......@@ -85,3 +86,5 @@ require (
replace github.com/odysseus/odysseus-protocol => ../odysseus-protocol
replace github.com/odysseus/cache => ../cache
replace github.com/odysseus/service-registry => ../service-registry
......@@ -53,8 +53,8 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP
github.com/edsrzf/mmap-go v0.0.0-20170320065105-0bce6a688712/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/elastic/go-elasticsearch/v6 v6.8.5/go.mod h1:UwaDJsD3rWLM5rKNFzv9hgox93HoX8utj1kxD9aFUcI=
github.com/elazarl/go-bindata-assetfs v1.0.0/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4=
github.com/ethereum/go-ethereum v1.13.10 h1:Ppdil79nN+Vc+mXfge0AuUgmKWuVv4eMqzoIVSdqZek=
github.com/ethereum/go-ethereum v1.13.10/go.mod h1:sc48XYQxCzH3fG9BcrXCOOgQk2JfZzNAmIKnceogzsA=
github.com/ethereum/go-ethereum v1.13.13 h1:KYn9w7pEWRI9oyZOzO94OVbctSusPByHdFDPj634jII=
github.com/ethereum/go-ethereum v1.13.13/go.mod h1:TN8ZiHrdJwSe8Cb6x+p0hs5CxhJZPbqB7hHkaUXcmIU=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
......@@ -206,8 +206,8 @@ github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/redis/go-redis/v9 v9.4.0 h1:Yzoz33UZw9I/mFhx4MNrB6Fk+XHO1VukNcCa1+lwyKk=
github.com/redis/go-redis/v9 v9.4.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8=
github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5 h1:mZHayPoR0lNmnHyvtYjDeq0zlVHn9K/ZXoy17ylucdo=
github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5/go.mod h1:GEXHk5HgEKCvEIIrSpFI3ozzG5xOKA2DVlEX/gGnewM=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
......@@ -315,8 +315,8 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
......
package nmregistry
package nmregister
type ManagerFilter interface {
Filter(RegistryInfo) bool
Filter(FullNodeManagerInfo) bool
}
package nmregistry
package nmregister
import (
"context"
"crypto/ecdsa"
"encoding/json"
"fmt"
"github.com/odysseus/nodemanager/config"
"github.com/odysseus/nodemanager/utils"
"github.com/odysseus/service-registry/common"
"github.com/odysseus/service-registry/query"
"github.com/odysseus/service-registry/registry"
"github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"
"os"
"sync"
"time"
)
......@@ -15,11 +19,9 @@ import (
// nmregistry implement register nodemanger to redis, and get nodemanger list from redis.
// filter some nodemanager to worker.
type RegistryInfo struct {
Pubkey string `redis:"pubkey"`
Timestamp int64 `redis:"timestamp"`
Endpoint string `redis:"endpoint"`
Addr string `redis:"addr"`
type FullNodeManagerInfo struct {
Reginfo registry.RegistryInfo
NmInfo query.NodeManagerInfo
}
type NodeManagerInfo struct {
......@@ -27,8 +29,9 @@ type NodeManagerInfo struct {
Endpoint string `json:"endpoint"`
}
type RegistryService struct {
nodelist []RegistryInfo
type NMRegister struct {
querier *query.ServiceQuerier
nodelist []FullNodeManagerInfo
rdb *redis.Client
conf *config.Config
rw sync.RWMutex
......@@ -36,18 +39,47 @@ type RegistryService struct {
quit chan struct{}
}
func NewRegistryService(conf *config.Config, rdb *redis.Client, public ecdsa.PublicKey) *RegistryService {
return &RegistryService{
rdb: rdb,
conf: conf,
public: public,
quit: make(chan struct{}),
func (s *NMRegister) ServiceType() common.ServiceType {
return common.SERVICE_NODE_MANAGER
}
func (s *NMRegister) Instance() string {
hname, _ := os.Hostname()
return fmt.Sprintf("%s:%s", s.conf.RemoteHost, hname)
}
func (s *NMRegister) Status() string {
return "running"
}
func (s *NMRegister) DetailInfo() (json.RawMessage, error) {
priv, err := utils.HexToPrivatekey(s.conf.PrivateKey)
if err != nil {
panic(fmt.Sprintf("invalid private key: %s", err))
}
addr := utils.PrivatekeyToHex(priv)
pubHex := utils.PubkeyToHex(&priv.PublicKey)
endpoint := s.conf.PublicEndpoint()
info := query.NodeManagerInfo{}
info.Addr = addr
info.Pubkey = pubHex
info.Endpoint = endpoint
return json.Marshal(info)
}
func NewNMRegister(conf *config.Config, querier *query.ServiceQuerier, public ecdsa.PublicKey) *NMRegister {
return &NMRegister{
querier: querier,
conf: conf,
public: public,
quit: make(chan struct{}),
}
}
func (s *RegistryService) Start() {
ticker := time.NewTicker(time.Second * 1)
defer ticker.Stop()
func (s *NMRegister) Start() {
refresh := time.NewTicker(time.Second * 5)
defer refresh.Stop()
......@@ -55,14 +87,8 @@ func (s *RegistryService) Start() {
select {
case <-s.quit:
return
case <-ticker.C:
if err := s.registry(s.rdb); err != nil {
log.WithError(err).Error("registry failed")
} else {
ticker.Reset(time.Second * 10)
}
case <-refresh.C:
if nodes, err := s.allNodeManager(s.rdb); err != nil {
if nodes, err := s.allNodeManager(); err != nil {
log.WithError(err).Error("refresh all nodemanager failed")
} else {
s.rw.Lock()
......@@ -75,31 +101,11 @@ func (s *RegistryService) Start() {
}
}
func (s *RegistryService) Stop() {
func (s *NMRegister) Stop() {
close(s.quit)
}
func (s *RegistryService) registry(rdb *redis.Client) error {
priv, err := utils.HexToPrivatekey(s.conf.PrivateKey)
if err != nil {
panic(fmt.Sprintf("invalid private key: %s", err))
}
addr := utils.PrivatekeyToHex(priv)
pubHex := utils.PubkeyToHex(&priv.PublicKey)
endpoint := s.conf.PublicEndpoint()
err = rdb.HSet(context.Background(), config.NODE_MANAGER_SET+addr, RegistryInfo{
Pubkey: pubHex,
Timestamp: time.Now().Unix(),
Endpoint: endpoint,
Addr: addr,
}).Err()
return err
}
func (s *RegistryService) GetNodeManagerList(filter ManagerFilter) []NodeManagerInfo {
func (s *NMRegister) GetNodeManagerList(filter ManagerFilter) []NodeManagerInfo {
s.rw.RLock()
defer s.rw.RUnlock()
var ret []NodeManagerInfo
......@@ -107,32 +113,38 @@ func (s *RegistryService) GetNodeManagerList(filter ManagerFilter) []NodeManager
for _, node := range s.nodelist {
if filter == nil || filter.Filter(node) {
ret = append(ret, NodeManagerInfo{
Pubkey: node.Pubkey,
Endpoint: node.Endpoint,
Pubkey: node.NmInfo.Pubkey,
Endpoint: node.NmInfo.Endpoint,
})
}
}
return ret
}
func (s *RegistryService) allNodeManager(rdb *redis.Client) ([]RegistryInfo, error) {
var ret []RegistryInfo
func (s *NMRegister) allNodeManager() ([]FullNodeManagerInfo, error) {
var ret []FullNodeManagerInfo
var tsExpired = 100
keys, err := rdb.Keys(context.Background(), config.NODE_MANAGER_SET+"*").Result()
nmlist, err := s.querier.Select(common.SERVICE_NODE_MANAGER).List()
if err != nil {
return nil, err
}
for _, key := range keys {
res := rdb.HGetAll(context.Background(), key)
var info RegistryInfo
if err := res.Scan(&info); err != nil {
for _, v := range nmlist {
var info registry.RegistryInfo
var nminfo query.NodeManagerInfo
if err := json.Unmarshal([]byte(v), &info); err != nil {
continue
}
if err := json.Unmarshal([]byte(info.Detail), &nminfo); err != nil {
continue
}
if time.Now().Unix()-info.Timestamp > int64(tsExpired) {
// heart beat expired, ignore this nodemanager
continue
}
ret = append(ret, info)
ret = append(ret, FullNodeManagerInfo{
Reginfo: info,
NmInfo: nminfo,
})
}
return ret, nil
}
......@@ -8,10 +8,12 @@ import (
"github.com/odysseus/cache/cachedata"
"github.com/odysseus/cache/model"
"github.com/odysseus/nodemanager/config"
"github.com/odysseus/nodemanager/nmregistry"
"github.com/odysseus/nodemanager/nmregister"
"github.com/odysseus/nodemanager/utils"
basev1 "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"github.com/odysseus/service-registry/query"
"github.com/odysseus/service-registry/registry"
"github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
......@@ -20,7 +22,8 @@ import (
)
type Node struct {
registry *nmregistry.RegistryService
register *nmregister.NMRegister
registry *registry.Registry
apiServer *grpc.Server
rdb *redis.Client
wm *WorkerManager
......@@ -32,6 +35,26 @@ type Node struct {
func NewNode() *Node {
redisConfig := config.GetConfig().Redis
privk, err := utils.HexToPrivatekey(config.GetConfig().PrivateKey)
if err != nil {
log.WithError(err).Error("failed to parse node manager private key")
return nil
}
querier := query.NewQuery(registry.RedisConnParam{
Addr: redisConfig.Addr,
Password: redisConfig.Password,
DbIndex: redisConfig.DbIndex,
})
register := nmregister.NewNMRegister(config.GetConfig(), querier, privk.PublicKey)
reg := registry.NewRegistry(registry.RedisConnParam{
Addr: redisConfig.Addr,
Password: redisConfig.Password,
DbIndex: redisConfig.DbIndex,
}, register)
rdb := utils.NewRedisClient(utils.RedisConnParam{
Addr: redisConfig.Addr,
Password: redisConfig.Password,
......@@ -50,19 +73,15 @@ func NewNode() *Node {
DbName: dbconf.DbName,
})
privk, err := utils.HexToPrivatekey(config.GetConfig().PrivateKey)
if err != nil {
log.WithError(err).Error("failed to parse node manager private key")
return nil
}
brokers := strings.Split(config.GetConfig().Kafka.Brokers, ";")
producer, _ := utils.NewKafkaProducer(brokers)
node := &Node{
register: register,
registry: reg,
rdb: rdb,
privk: privk,
cache: pay,
apiServer: grpc.NewServer(grpc.MaxSendMsgSize(1024*1024*20), grpc.MaxRecvMsgSize(1024*1024*20)),
registry: nmregistry.NewRegistryService(config.GetConfig(), rdb, privk.PublicKey),
kafkaProducer: producer,
taskResultCh: make(chan *basev1.TaskReceipt, 100000),
}
......@@ -79,6 +98,7 @@ func (n *Node) Sign(hash []byte) ([]byte, error) {
func (n *Node) Start() error {
go n.registry.Start()
go n.register.Start()
go n.postLoop()
if err := n.apiStart(); err != nil {
......@@ -134,6 +154,7 @@ func (n *Node) postLoop() {
func (n *Node) Stop() {
n.registry.Stop()
n.register.Stop()
n.apiServer.Stop()
close(n.taskResultCh)
}
......@@ -20,7 +20,7 @@ type NodeManagerService struct {
}
func (n *NodeManagerService) ManagerList(ctx context.Context, request *omanager.ManagerListRequest) (*omanager.ManagerListResponse, error) {
list := n.node.registry.GetNodeManagerList(nil)
list := n.node.register.GetNodeManagerList(nil)
res := new(omanager.ManagerListResponse)
res.Managers = make([]*omanager.NodeManagerInfo, 0, len(list))
for _, v := range list {
......
......@@ -14,6 +14,7 @@ import (
"github.com/odysseus/nodemanager/utils"
odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"github.com/odysseus/service-registry/registry"
"github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"
"golang.org/x/crypto/sha3"
......@@ -44,9 +45,10 @@ type Worker struct {
status []byte
online bool
usageInfo []*omanager.DeviceUsage
deviceInfo []*omanager.DeviceInfo
deviceInfo *omanager.DeviceInfoResponse
deviceInfoHash []byte
recentTask *lru.Cache
state string
stream omanager.NodeManagerService_RegisterWorkerServer
}
......@@ -63,10 +65,11 @@ type WorkerManager struct {
heartBeat map[int64]int64
hbRwLock sync.RWMutex
workers map[int64]*Worker
workid map[string]*Worker
wkRwLock sync.RWMutex
quit chan struct{}
workers map[int64]*Worker
workid map[string]*Worker
workerReg map[int64]*registry.Registry
wkRwLock sync.RWMutex
quit chan struct{}
node *Node
std *standardlib.StandardTasks
......@@ -94,10 +97,6 @@ func (wm *WorkerManager) UpdateHeartBeat(uuid int64) {
wm.heartBeat[uuid] = time.Now().Unix()
}
func (wm *WorkerManager) UpdateStatus(worker *Worker) {
}
func (wm *WorkerManager) GetHeartBeat(uuid int64) int64 {
wm.hbRwLock.RLock()
defer wm.hbRwLock.RUnlock()
......@@ -111,6 +110,21 @@ func (wm *WorkerManager) GetWorker(uuid int64) *Worker {
return wm.workers[uuid]
}
func (wm *WorkerManager) SetWorkerRegistry(uuid int64, reg *registry.Registry) {
wm.wkRwLock.Lock()
defer wm.wkRwLock.Unlock()
wm.workerReg[uuid] = reg
}
func (wm *WorkerManager) StopRegistry(uuid int64) {
wm.wkRwLock.Lock()
defer wm.wkRwLock.Unlock()
if reg, exist := wm.workerReg[uuid]; exist {
reg.Stop()
delete(wm.workerReg, uuid)
}
}
func (wm *WorkerManager) SetWorkerAddr(worker *Worker, addr string) {
wm.wkRwLock.Lock()
defer wm.wkRwLock.Unlock()
......@@ -194,14 +208,26 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
deviceUsageTicker := time.NewTicker(initialInterval)
defer deviceUsageTicker.Stop()
reg := registry.NewRegistry(registry.RedisConnParam{
Addr: config.GetConfig().Redis.Addr,
Password: config.GetConfig().Redis.Password,
DbIndex: config.GetConfig().Redis.DbIndex,
}, workerRegistry{worker: worker, wm: wm})
wm.SetWorkerRegistry(worker.uuid, reg)
worker.state = "connected"
go reg.Start()
defer func() {
log.WithFields(log.Fields{
"worker-addr": worker.addr,
"worker-uuid": worker.uuid,
}).Info("exit manage worker")
worker.online = false
worker.state = "disconnected"
wm.InActiveWorker(worker)
wm.StopRegistry(worker.uuid)
}()
for {
......@@ -431,7 +457,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
// receive device info
worker.online = true
worker.publicKey = msg.DeviceInfo.MinerPubkey
worker.deviceInfo = msg.DeviceInfo.Devices
worker.deviceInfo = msg.DeviceInfo
worker.benefitAddr = msg.DeviceInfo.BenefitAddress
var addr = ""
if pubkey, err := utils.HexToPubkey(worker.publicKey); err != nil {
......
package server
import (
"encoding/json"
"fmt"
"github.com/odysseus/service-registry/common"
"github.com/odysseus/service-registry/query"
"github.com/odysseus/service-registry/registry"
)
var (
wr registry.Register = workerRegistry{}
)
type workerRegistry struct {
worker *Worker
wm *WorkerManager
}
func (w workerRegistry) ServiceType() common.ServiceType {
return common.SERVICE_WORKER
}
func (w workerRegistry) Instance() string {
return fmt.Sprintf("%s", w.worker.addr)
}
func (w workerRegistry) Status() string {
return fmt.Sprintf("%s", w.worker.status)
}
func (w workerRegistry) DetailInfo() (json.RawMessage, error) {
if w.worker == nil {
return nil, fmt.Errorf("worker is nil")
}
if w.worker.addr == "" {
return nil, fmt.Errorf("worker address is empty")
}
info := query.WorkerInfo{}
info.BenefitAddress = w.worker.benefitAddr
if w.worker.deviceInfo != nil {
info.IP = w.worker.deviceInfo.DeviceIps[0]
}
info.ActiveNM, _ = w.wm.WorkerNmList(w.worker)
info.HearBeat = w.wm.GetHeartBeat(w.worker.uuid)
info.MinerAddress = w.worker.addr
return json.Marshal(info)
}
......@@ -15,7 +15,7 @@ func (wm *WorkerManager) UpdateWorkerDeviceInfo(worker *Worker, deviceInfos stri
func (wm *WorkerManager) AddWorkerFirst(worker *Worker) error {
log.WithField("worker", worker.addr).Info("add worker first time.")
for _, device := range worker.deviceInfo {
for _, device := range worker.deviceInfo.Devices {
// add device to redis
priority := 0
_ = device // todo: set priority with device info.
......@@ -47,6 +47,10 @@ func (wm *WorkerManager) ActiveWorker(worker *Worker) {
wm.rdb.SAdd(context.Background(), workerStatusKey(worker), config.GetConfig().PublicEndpoint())
}
func (wm *WorkerManager) WorkerNmList(worker *Worker) ([]string, error) {
return wm.rdb.SMembers(context.Background(), workerStatusKey(worker)).Result()
}
func (wm *WorkerManager) InActiveWorker(worker *Worker) {
wm.rdb.SRem(context.Background(), workerStatusKey(worker), config.GetConfig().PublicEndpoint())
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment