Commit 41802e81 authored by vicotor's avatar vicotor

update nm

parent b3a690af
...@@ -20,13 +20,21 @@ type RedisConfig struct { ...@@ -20,13 +20,21 @@ type RedisConfig struct {
DbIndex int `json:"db_index" toml:"db_index"` DbIndex int `json:"db_index" toml:"db_index"`
} }
type TickerConfig struct {
HeartBeat int `json:"heart_beat" toml:"heart_beat"`
StatusTicker int `json:"status_ticker" toml:"status_ticker"`
DeviceInfoTicker int `json:"device_info_ticker" toml:"device_info_ticker"`
DeviceUsageTicker int `json:"device_usage_ticker" toml:"device_usage_ticker"`
}
type Config struct { type Config struct {
PrivateKey string `json:"private_key" toml:"private_key"` PrivateKey string `json:"private_key" toml:"private_key"`
Endpoint string `json:"endpoint" toml:"endpoint"` Endpoint string `json:"endpoint" toml:"endpoint"`
MetricPort int `json:"metrics_port" toml:"metrics_port"` MetricPort int `json:"metrics_port" toml:"metrics_port"`
EnablePay bool `json:"enable_pay" toml:"enable_pay"` EnablePay bool `json:"enable_pay" toml:"enable_pay"`
Redis RedisConfig `json:"redis" toml:"redis"` Redis RedisConfig `json:"redis" toml:"redis"`
DbConfig MysqlConfig `json:"mysql" toml:"mysql"` DbConfig MysqlConfig `json:"mysql" toml:"mysql"`
Tickers TickerConfig `json:"ticker" toml:"ticker"`
} }
var _cfg *Config = nil var _cfg *Config = nil
......
...@@ -35,6 +35,9 @@ type Worker struct { ...@@ -35,6 +35,9 @@ type Worker struct {
resultCh chan *omanager.SubmitTaskResult resultCh chan *omanager.SubmitTaskResult
uuid int64 uuid int64
publicKey string publicKey string
addr string
status []byte
usageInfo []*omanager.DeviceUsage
deviceInfo []*omanager.DeviceInfo deviceInfo []*omanager.DeviceInfo
recentTask *lru.Cache recentTask *lru.Cache
stream omanager.NodeManagerService_RegisterWorkerServer stream omanager.NodeManagerService_RegisterWorkerServer
...@@ -123,8 +126,9 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error { ...@@ -123,8 +126,9 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
log.WithField("worker", worker.uuid).Info("start manage worker") log.WithField("worker", worker.uuid).Info("start manage worker")
defer log.WithField("worker", worker.uuid).Info("exit manage worker") defer log.WithField("worker", worker.uuid).Info("exit manage worker")
initialInterval := time.Second * 4
heartBeatDuration := time.Second * 10 tickerConf := config.GetConfig().Tickers
heartBeatDuration := time.Second * time.Duration(tickerConf.HeartBeat)
workerCheckDuration := heartBeatDuration * 3 workerCheckDuration := heartBeatDuration * 3
heartBeatTicker := time.NewTicker(heartBeatDuration) heartBeatTicker := time.NewTicker(heartBeatDuration)
...@@ -133,13 +137,13 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error { ...@@ -133,13 +137,13 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
workerCheckTicker := time.NewTicker(workerCheckDuration) workerCheckTicker := time.NewTicker(workerCheckDuration)
defer workerCheckTicker.Stop() defer workerCheckTicker.Stop()
statusTicker := time.NewTicker(time.Second * 10) statusTicker := time.NewTicker(initialInterval)
defer statusTicker.Stop() defer statusTicker.Stop()
deviceInfoTicker := time.NewTicker(time.Second * 10) deviceInfoTicker := time.NewTicker(initialInterval)
defer deviceInfoTicker.Stop() defer deviceInfoTicker.Stop()
deviceUsageTicker := time.NewTicker(time.Second * 10) deviceUsageTicker := time.NewTicker(initialInterval)
defer deviceUsageTicker.Stop() defer deviceUsageTicker.Stop()
defer wm.InActiveWorker(worker) defer wm.InActiveWorker(worker)
...@@ -160,6 +164,15 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error { ...@@ -160,6 +164,15 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
return nil return nil
case <-workerCheckTicker.C: case <-workerCheckTicker.C:
if worker.deviceInfo != nil && worker.addr != "" {
deviceInfoTicker.Reset(time.Second * time.Duration(tickerConf.DeviceInfoTicker))
}
if worker.status != nil {
statusTicker.Reset(time.Second * time.Duration(tickerConf.StatusTicker))
}
if worker.usageInfo != nil {
deviceUsageTicker.Reset(time.Second * time.Duration(tickerConf.DeviceUsageTicker))
}
if time.Now().Unix()-wm.GetHeartBeat(worker.uuid) > int64(workerCheckDuration.Seconds()) { if time.Now().Unix()-wm.GetHeartBeat(worker.uuid) > int64(workerCheckDuration.Seconds()) {
wm.InActiveWorker(worker) wm.InActiveWorker(worker)
// todo: remove worker // todo: remove worker
...@@ -178,9 +191,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error { ...@@ -178,9 +191,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
deviceInfo.DeviceRequest = &omanager.DeviceInfoRequest{} deviceInfo.DeviceRequest = &omanager.DeviceInfoRequest{}
msg.Message = deviceInfo msg.Message = deviceInfo
callback = func(err error) bool { callback = func(err error) bool {
if err == nil {
deviceInfoTicker.Reset(time.Second * 180)
}
return true return true
} }
...@@ -189,9 +200,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error { ...@@ -189,9 +200,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
deviceUsage.DeviceUsage = &omanager.DeviceUsageRequest{} deviceUsage.DeviceUsage = &omanager.DeviceUsageRequest{}
msg.Message = deviceUsage msg.Message = deviceUsage
callback = func(err error) bool { callback = func(err error) bool {
if err == nil {
deviceUsageTicker.Reset(time.Second * 180)
}
return true return true
} }
...@@ -200,9 +209,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error { ...@@ -200,9 +209,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
status.StatusRequest = &omanager.StatusRequest{} status.StatusRequest = &omanager.StatusRequest{}
msg.Message = status msg.Message = status
callback = func(err error) bool { callback = func(err error) bool {
if err == nil {
statusTicker.Reset(time.Second * 120)
}
return true return true
} }
...@@ -342,11 +349,21 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -342,11 +349,21 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"worker": worker.uuid, "worker": worker.uuid,
}).Debugf("receive worker device info:%v", msg.DeviceInfo.Devices) }).Debugf("receive worker device info:%v", msg.DeviceInfo.Devices)
if worker.deviceInfo == nil { if worker.deviceInfo == nil || worker.addr == "" {
// first time receive device info // first time receive device info
worker.publicKey = msg.DeviceInfo.MinerPubkey worker.publicKey = msg.DeviceInfo.MinerPubkey
worker.deviceInfo = msg.DeviceInfo.Devices worker.deviceInfo = msg.DeviceInfo.Devices
wm.AddWorker(worker) if pubkey, err := utils.HexToPubkey(worker.publicKey); err != nil {
log.WithFields(log.Fields{
"worker": worker.uuid,
"error": err,
}).Error("parse pubkey failed")
} else {
worker.addr = utils.PubkeyToAddress(pubkey)
}
if worker.addr != "" {
wm.AddWorker(worker)
}
} }
case *omanager.WorkerMessage_DeviceUsage: case *omanager.WorkerMessage_DeviceUsage:
......
...@@ -11,24 +11,21 @@ func (wm *WorkerManager) AddWorker(worker *Worker) error { ...@@ -11,24 +11,21 @@ func (wm *WorkerManager) AddWorker(worker *Worker) error {
// add device to redis // add device to redis
priority := 0 priority := 0
_ = device // todo: set priority with device info. _ = device // todo: set priority with device info.
if err := wm.rdb.LPush(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(priority), worker.uuid).Err(); err != nil { if err := wm.rdb.LPush(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(priority), worker.addr).Err(); err != nil {
continue continue
} }
} }
// add worker to redis queue // add worker to redis queue
if err := wm.rdb.SAdd(context.Background(), config.WORKER_STATUS_PREFIX+strconv.FormatInt(worker.uuid, 10), config.GetConfig().Endpoint).Err(); err != nil { if err := wm.rdb.SAdd(context.Background(), config.WORKER_STATUS_PREFIX+worker.addr, config.GetConfig().Endpoint).Err(); err != nil {
return err return err
} }
//wm.rdb.Set(context.Background(), config.WORKER_STATUS_PREFIX+strconv.FormatInt(worker.uuid, 10), odysseus.WorkerStatus_WorkerStatusActive, 0)
return nil return nil
} }
func (wm *WorkerManager) ActiveWorker(worker *Worker) { func (wm *WorkerManager) ActiveWorker(worker *Worker) {
wm.rdb.SAdd(context.Background(), config.WORKER_STATUS_PREFIX+strconv.FormatInt(worker.uuid, 10), config.GetConfig().Endpoint) wm.rdb.SAdd(context.Background(), config.WORKER_STATUS_PREFIX+worker.addr, config.GetConfig().Endpoint)
//wm.rdb.Set(context.Background(), config.WORKER_STATUS_PREFIX+strconv.FormatInt(worker.uuid, 10), odysseus.WorkerStatus_WorkerStatusActive, 0)
} }
func (wm *WorkerManager) InActiveWorker(worker *Worker) { func (wm *WorkerManager) InActiveWorker(worker *Worker) {
wm.rdb.SRem(context.Background(), config.WORKER_STATUS_PREFIX+strconv.FormatInt(worker.uuid, 10), config.GetConfig().Endpoint) wm.rdb.SRem(context.Background(), config.WORKER_STATUS_PREFIX+worker.addr, config.GetConfig().Endpoint)
//wm.rdb.Set(context.Background(), config.WORKER_STATUS_PREFIX+strconv.FormatInt(worker.uuid, 10), odysseus.WorkerStatus_WorkerStatusInActive, 0)
} }
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"crypto/ecdsa" "crypto/ecdsa"
"encoding/hex" "encoding/hex"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"strings"
) )
func HexToPrivatekey(key string) (*ecdsa.PrivateKey, error) { func HexToPrivatekey(key string) (*ecdsa.PrivateKey, error) {
...@@ -28,6 +29,9 @@ func PubkeyToHex(key *ecdsa.PublicKey) string { ...@@ -28,6 +29,9 @@ func PubkeyToHex(key *ecdsa.PublicKey) string {
} }
func HexToPubkey(key string) (*ecdsa.PublicKey, error) { func HexToPubkey(key string) (*ecdsa.PublicKey, error) {
if strings.HasPrefix(key, "0x") {
key = key[2:]
}
pub, err := hex.DecodeString(key) pub, err := hex.DecodeString(key)
if err != nil { if err != nil {
return nil, err return nil, err
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment