Commit b6f1a32a authored by vicotor's avatar vicotor

update worker manager

parent 4827c9fc
......@@ -3,8 +3,10 @@ package config
const (
NODE_MANAGER_SET = "node_manager_set"
WORKER_STATUS_PREFIX = "worker_status_"
WORKER_NONCE_KEY_PREFIX = "worker_nonce_"
WORKER_QUEUE_PREFIX = "worker_queue_"
WORKER_DEVICE_INFO_PREFIX = "worker_device_info_"
WORKER_DEVICE_STATUS_PREFIX = "worker_device_status_"
WORKER_USAGE_INFO_PREFIX = "worker_usage_info_"
WORKER_RESOURCE_INFO_PREFIX = "worker_resource_info_"
)
......@@ -33,6 +33,8 @@ func (n *NodeManagerService) ManagerList(ctx context.Context, request *omanager.
}
func (n *NodeManagerService) RegisterWorker(client omanager.NodeManagerService_RegisterWorkerServer) error {
//return n.node.wm.handleNewDial(client)
uuid := utils.GetSnowflakeId()
worker, err := n.node.wm.AddNewWorker(uuid, client)
......
......@@ -9,6 +9,7 @@ import (
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
log "github.com/sirupsen/logrus"
"math/big"
"time"
)
func (wm *WorkerManager) taskResult(worker *Worker, task *odysseus.TaskContent, result *omanager.SubmitTaskResult) (*omanager.ManagerMessage_ProofTaskResult, error) {
......@@ -17,13 +18,15 @@ func (wm *WorkerManager) taskResult(worker *Worker, task *odysseus.TaskContent,
return wm.computeTaskResult(worker, task, result)
case odysseus.TaskKind_StandardTask:
return wm.standardTaskResult(worker, task, result)
}
return nil, errors.New("unsupport task kind")
}
func (wm *WorkerManager) computeTaskResult(worker *Worker, task *odysseus.TaskContent, result *omanager.SubmitTaskResult) (*omanager.ManagerMessage_ProofTaskResult, error) {
if worker.info.nodeInfo == nil {
return nil, errors.New("unknown worker node info")
}
log.WithFields(log.Fields{
"task-id": task.TaskId,
"task-type": task.TaskType,
......@@ -77,7 +80,7 @@ func (wm *WorkerManager) computeTaskResult(worker *Worker, task *odysseus.TaskCo
paramHash := crypto.Keccak256Hash(task.TaskParam)
resultHash := crypto.Keccak256Hash(result.TaskResultBody)
dataHash := crypto.Keccak256Hash(utils.CombineBytes([]byte(result.TaskId), paramHash[:], resultHash[:]))
minerPubkey, _ := utils.HexToPubkey(worker.publicKey) // todo: get miner pubkey
minerPubkey, _ := utils.HexToPubkey(worker.info.nodeInfo.MinerPubkey)
verified := ecdsa.VerifyASN1(minerPubkey, dataHash[:], result.MinerSignature)
log.WithField("minerSignatureVerify", verified).Debug("miner signature verify")
if !verified {
......@@ -88,11 +91,13 @@ func (wm *WorkerManager) computeTaskResult(worker *Worker, task *odysseus.TaskCo
receipt := wm.makeReceipt(worker, task, result, Succeed)
wm.node.PostResult(receipt)
//manager_signature = sign(hash((task_id+hash(task_param)+hash(task_result)+container_signature+miner_signature+workload))
//manager_signature = sign(hash((task_id+hash(task_param)+hash(task_result)+container_signature+miner_signature+workload+time))
now := time.Now().Unix()
paramHash := crypto.Keccak256Hash(task.TaskParam)
resultHash := crypto.Keccak256Hash(result.TaskResultBody)
dataHash := crypto.Keccak256Hash(utils.CombineBytes([]byte(result.TaskId), paramHash[:], resultHash[:],
worker.ProfitAccount().Bytes(), worker.WorkerAccount().Bytes(), result.ContainerSignature, result.MinerSignature, big.NewInt(int64(task.TaskWorkload)).Bytes()))
worker.ProfitAccount().Bytes(), worker.WorkerAccount().Bytes(), result.ContainerSignature, result.MinerSignature, big.NewInt(int64(task.TaskWorkload)).Bytes()),
big.NewInt(now).Bytes())
signature, err := wm.node.Sign(dataHash[:])
if err != nil {
......@@ -104,6 +109,7 @@ func (wm *WorkerManager) computeTaskResult(worker *Worker, task *odysseus.TaskCo
proof.ProofTaskResult = &omanager.ProofTaskResult{
TaskId: result.TaskId,
ManagerSignature: signature,
Timestamp: uint64(now),
Workload: uint64(task.TaskWorkload),
ContainerPubkey: utils.CombineBytes(task.ContainerPubkey),
}
......@@ -115,6 +121,9 @@ func (wm *WorkerManager) computeTaskResult(worker *Worker, task *odysseus.TaskCo
}
func (wm *WorkerManager) standardTaskResult(worker *Worker, task *odysseus.TaskContent, result *omanager.SubmitTaskResult) (*omanager.ManagerMessage_ProofTaskResult, error) {
if worker.info.nodeInfo == nil {
return nil, errors.New("unknown worker node info")
}
log.WithFields(log.Fields{
"task-id": task.TaskId,
"task-type": task.TaskType,
......@@ -160,7 +169,7 @@ func (wm *WorkerManager) standardTaskResult(worker *Worker, task *odysseus.TaskC
paramHash := crypto.Keccak256Hash(task.TaskParam)
resultHash := crypto.Keccak256Hash(result.TaskResultBody)
dataHash := crypto.Keccak256Hash(utils.CombineBytes([]byte(result.TaskId), paramHash[:], resultHash[:]))
minerPubkey, _ := utils.HexToPubkey(worker.publicKey) // todo: get miner pubkey
minerPubkey, _ := utils.HexToPubkey(worker.info.nodeInfo.MinerPubkey)
verified := ecdsa.VerifyASN1(minerPubkey, dataHash[:], result.MinerSignature)
log.WithField("minerSignatureVerify", verified).Debug("miner signature verify")
if !verified {
......@@ -171,11 +180,13 @@ func (wm *WorkerManager) standardTaskResult(worker *Worker, task *odysseus.TaskC
receipt := wm.makeReceipt(worker, task, result, Succeed)
wm.node.PostResult(receipt)
//manager_signature = sign(hash((task_id+hash(task_param)+hash(task_result)+container_signature+miner_signature+workload))
now := time.Now().Unix()
//manager_signature = sign(hash((task_id+hash(task_param)+hash(task_result)+container_signature+miner_signature+workload+time))
paramHash := crypto.Keccak256Hash(task.TaskParam)
resultHash := crypto.Keccak256Hash(result.TaskResultBody)
dataHash := crypto.Keccak256Hash(utils.CombineBytes([]byte(result.TaskId), paramHash[:], resultHash[:],
worker.ProfitAccount().Bytes(), worker.WorkerAccount().Bytes(), result.ContainerSignature, result.MinerSignature, big.NewInt(int64(task.TaskWorkload)).Bytes()))
worker.ProfitAccount().Bytes(), worker.WorkerAccount().Bytes(), result.ContainerSignature, result.MinerSignature, big.NewInt(int64(task.TaskWorkload)).Bytes()),
big.NewInt(now).Bytes())
signature, err := wm.node.Sign(dataHash[:])
if err != nil {
......@@ -187,6 +198,7 @@ func (wm *WorkerManager) standardTaskResult(worker *Worker, task *odysseus.TaskC
proof.ProofTaskResult = &omanager.ProofTaskResult{
TaskId: result.TaskId,
ManagerSignature: signature,
Timestamp: uint64(now),
Workload: uint64(task.TaskWorkload),
ContainerPubkey: utils.CombineBytes(task.ContainerPubkey),
}
......
......@@ -27,6 +27,7 @@ var (
Succeed = errors.New("succeed")
ErrWorkerExist = errors.New("worker exist")
ErrHeartBeatExpired = errors.New("worker heartbeat expired")
ErrInvalidMessageValue = errors.New("invalid message value")
)
type dispatchTask struct {
......@@ -34,30 +35,44 @@ type dispatchTask struct {
errCh chan error
}
type workerInfo struct {
nodeInfo *omanager.NodeInfoResponse
deviceUsageInfo []*omanager.DeviceUsage
deviceInfo *omanager.DeviceInfoMessage
deviceStatusInfo *omanager.StatusResponse
resourceInfo *omanager.SubmitResourceMap
}
type Worker struct {
quit chan interface{}
taskCh chan *dispatchTask
resultCh chan *omanager.SubmitTaskResult
uuid int64
publicKey string
addr string
benefitAddr string
status []byte
uuid int64 // worker uuid in the local.
registed bool // worker is registed to this nm.
online bool
usageInfo []*omanager.DeviceUsage
deviceInfo *omanager.DeviceInfoResponse
nonce int
latestNmValue string
addFirstSucceed bool
info workerInfo
workerAddr string // worker address from public-key
deviceInfoHash []byte
recentTask *lru.Cache
state string
status string
stream omanager.NodeManagerService_RegisterWorkerServer
}
func (w *Worker) ProfitAccount() common.Address {
return common.HexToAddress(w.benefitAddr)
if w.info.nodeInfo != nil {
return common.HexToAddress(w.info.nodeInfo.BenefitAddress)
}
return common.Address{}
}
func (w *Worker) WorkerAccount() common.Address {
return common.HexToAddress(w.addr)
return common.HexToAddress(w.workerAddr)
}
type WorkerManager struct {
......@@ -65,6 +80,7 @@ type WorkerManager struct {
heartBeat map[int64]int64
hbRwLock sync.RWMutex
workerByIp sync.Map
workers map[int64]*Worker
workid map[string]*Worker
workerReg map[int64]*registry.Registry
......@@ -130,7 +146,7 @@ func (wm *WorkerManager) SetWorkerAddr(worker *Worker, addr string) {
wm.wkRwLock.Lock()
defer wm.wkRwLock.Unlock()
worker.addr = addr
worker.workerAddr = addr
wm.workid[addr] = worker
}
......@@ -141,26 +157,42 @@ func (wm *WorkerManager) GetWorkerByAddr(addr string) *Worker {
return wm.workid[addr]
}
func (wm *WorkerManager) AddNewWorker(uuid int64, worker omanager.NodeManagerService_RegisterWorkerServer) (*Worker, error) {
func (wm *WorkerManager) GetWorkerById(id int64) *Worker {
wm.wkRwLock.RLock()
defer wm.wkRwLock.RUnlock()
return wm.workers[id]
}
func (wm *WorkerManager) AddNewWorker(id int64, worker omanager.NodeManagerService_RegisterWorkerServer) (*Worker, error) {
wm.wkRwLock.Lock()
defer wm.wkRwLock.Unlock()
if _, exist := wm.workers[uuid]; exist {
if _, exist := wm.workers[id]; exist {
return nil, ErrWorkerExist
}
w := &Worker{
quit: make(chan interface{}),
taskCh: make(chan *dispatchTask),
resultCh: make(chan *omanager.SubmitTaskResult),
uuid: uuid,
uuid: id,
registed: false,
online: false,
info: workerInfo{},
workerAddr: "",
deviceInfoHash: nil,
status: "",
stream: worker,
quit: make(chan interface{}),
}
taskCache, err := lru.New(100)
if err != nil {
return nil, err
}
w.recentTask = taskCache
wm.workers[uuid] = w
wm.workers[id] = w
go wm.handleWorkerMsg(w)
return w, nil
......@@ -183,6 +215,19 @@ func (wm *WorkerManager) doCallback(hook string, response *odysseus.TaskResponse
}
}
func (wm *WorkerManager) disconnect(worker *Worker) {
worker.online = false
worker.status = "disconnected"
wm.InActiveWorker(worker)
if worker.registed {
wm.StopRegistry(worker.uuid)
}
wm.wkRwLock.Lock()
delete(wm.workers, worker.uuid)
delete(wm.workid, worker.workerAddr)
}
func (wm *WorkerManager) manageWorker(worker *Worker) error {
log.WithField("worker", worker.uuid).Info("start manage worker")
......@@ -197,38 +242,27 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
heartBeatTicker := time.NewTicker(initialHeartBeatInterval)
defer heartBeatTicker.Stop()
nodeinfoTicker := time.NewTicker(initialHeartBeatInterval)
defer nodeinfoTicker.Stop()
workerCheckTicker := time.NewTicker(workerCheckDuration)
defer workerCheckTicker.Stop()
statusTicker := time.NewTicker(initialInterval)
defer statusTicker.Stop()
deviceInfoTicker := time.NewTicker(initialInterval)
defer deviceInfoTicker.Stop()
deviceUsageTicker := time.NewTicker(initialInterval)
defer deviceUsageTicker.Stop()
reg := registry.NewRegistry(registry.RedisConnParam{
Addr: config.GetConfig().Redis.Addr,
Password: config.GetConfig().Redis.Password,
DbIndex: config.GetConfig().Redis.DbIndex,
}, workerRegistry{worker: worker, wm: wm})
wm.SetWorkerRegistry(worker.uuid, reg)
worker.state = "connected"
go reg.Start()
worker.status = "connected"
defer func() {
log.WithFields(log.Fields{
"worker-addr": worker.addr,
"worker-addr": worker.workerAddr,
"worker-uuid": worker.uuid,
}).Info("exit manage worker")
worker.online = false
worker.state = "disconnected"
wm.InActiveWorker(worker)
wm.StopRegistry(worker.uuid)
wm.disconnect(worker)
}()
for {
......@@ -243,25 +277,33 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
gb := new(omanager.ManagerMessage_GoodbyeMessage)
gb.GoodbyeMessage = &omanager.GoodbyeMessage{}
msg.Message = gb
case <-worker.quit:
return nil
case <-workerCheckTicker.C:
if worker.deviceInfo != nil && worker.addr != "" {
deviceInfoTicker.Reset(time.Second * time.Duration(tickerConf.DeviceInfoTicker))
if worker.info.nodeInfo != nil {
nodeinfoTicker.Reset(time.Hour * 24)
}
if worker.status != nil {
if worker.info.deviceStatusInfo != nil {
statusTicker.Reset(time.Second * time.Duration(tickerConf.StatusTicker))
}
if worker.usageInfo != nil {
if worker.info.deviceUsageInfo != nil {
deviceUsageTicker.Reset(time.Second * time.Duration(tickerConf.DeviceUsageTicker))
}
if time.Now().Unix()-wm.GetHeartBeat(worker.uuid) > int64(workerCheckDuration.Seconds()) {
wm.InActiveWorker(worker)
// todo: remove worker
close(worker.quit)
return ErrHeartBeatExpired
}
if worker.registed && worker.addFirstSucceed == false && len(worker.deviceInfoHash) == 0 {
wm.AddWorkerToQueue(worker)
}
wm.UpdateWorkerActive(worker)
case <-heartBeatTicker.C:
hb := new(omanager.ManagerMessage_HeartbeatRequest)
hb.HeartbeatRequest = &omanager.HeartbeatRequest{
......@@ -274,30 +316,35 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
return true
}
case <-deviceInfoTicker.C:
deviceInfo := new(omanager.ManagerMessage_DeviceRequest)
deviceInfo.DeviceRequest = &omanager.DeviceInfoRequest{}
msg.Message = deviceInfo
case <-nodeinfoTicker.C:
nodeinfo := new(omanager.ManagerMessage_NodeInfoRequest)
nodeinfo.NodeInfoRequest = &omanager.NodeInfoRequest{}
msg.Message = nodeinfo
callback = func(err error) bool {
return true
}
case <-deviceUsageTicker.C:
// if worker is not registed to me, ignore device usage info.
if !worker.registed {
continue
}
deviceUsage := new(omanager.ManagerMessage_DeviceUsage)
deviceUsage.DeviceUsage = &omanager.DeviceUsageRequest{}
msg.Message = deviceUsage
callback = func(err error) bool {
return true
}
case <-statusTicker.C:
// if worker is not registed to me, ignore device status info.
if !worker.registed {
continue
}
status := new(omanager.ManagerMessage_StatusRequest)
status.StatusRequest = &omanager.StatusRequest{}
msg.Message = status
callback = func(err error) bool {
return true
}
......@@ -374,8 +421,8 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
}
func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
l := log.WithField("worker-uuid", worker.uuid)
l.WithField("worker-addr", worker.addr).Info("start handle worker message")
defer l.WithField("worker-addr", worker.addr).Info("exit handle worker message")
l.WithField("worker-addr", worker.workerAddr).Info("start handle worker message")
defer l.WithField("worker-addr", worker.workerAddr).Info("exit handle worker message")
for {
select {
case <-wm.quit:
......@@ -385,41 +432,88 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
default:
wmsg, err := worker.stream.Recv()
if err != nil {
l.WithError(err).WithField("worker-addr", worker.addr).Error("recv msg failed")
l.WithError(err).WithField("worker-addr", worker.workerAddr).Error("recv msg failed")
close(worker.quit)
return
}
worker.online = true
switch msg := wmsg.Message.(type) {
case *omanager.WorkerMessage_GoodbyeMessage:
worker.online = false
worker.quit <- msg.GoodbyeMessage.Reason
close(worker.taskCh)
return
case *omanager.WorkerMessage_SubmitTaskResult:
worker.resultCh <- msg.SubmitTaskResult
case *omanager.WorkerMessage_HeartbeatResponse:
worker.online = true
wm.UpdateHeartBeat(worker.uuid)
l.WithFields(log.Fields{
"worker-addr": worker.addr,
"worker-addr": worker.workerAddr,
"hearBeat": time.Now().Unix() - int64(msg.HeartbeatResponse.Timestamp),
}).Debug("receive worker heartbeat")
case *omanager.WorkerMessage_NodeInfo:
worker.info.nodeInfo = msg.NodeInfo
var addr = ""
if pubkey, err := utils.HexToPubkey(msg.NodeInfo.MinerPubkey); err != nil {
l.WithFields(log.Fields{
"worker-addr": worker.workerAddr,
"error": err,
}).Error("parse pubkey failed")
} else {
addr = utils.PubkeyToAddress(pubkey)
}
if addr == worker.workerAddr || addr == "" {
// addr is not change.
continue
}
// checkout addr exist.
if worker.workerAddr == "" {
if w := wm.GetWorkerByAddr(addr); w != nil {
log.WithField("worker-addr", addr).Error("worker with the address is existed")
close(worker.quit)
return
}
}
if worker.workerAddr != "" {
// todo: worker change pubkey.
wm.InActiveWorker(worker)
}
// update new worker.
wm.SetWorkerAddr(worker, addr)
case *omanager.WorkerMessage_Status:
// todo: store worker status
worker.status = msg.Status.DeviceStatus
if !worker.registed {
continue
}
worker.info.deviceStatusInfo = msg.Status
l.WithFields(log.Fields{
"worker-addr": worker.addr,
"worker-addr": worker.workerAddr,
}).Debugf("receive worker status:0x%x", msg.Status.DeviceStatus)
wm.UpdateWorkerDeviceStatusInfo(worker, msg.Status.DeviceStatus)
case *omanager.WorkerMessage_ResourceMap:
// todo: store worker resource map.
if !worker.registed {
continue
}
l.WithFields(log.Fields{
"worker-addr": worker.addr,
"worker-addr": worker.workerAddr,
}).Debugf("receive worker resource map:%v", msg.ResourceMap)
wm.UpdateWorkerResourceInfo(worker, msg.ResourceMap.ResourceMap)
case *omanager.WorkerMessage_FetchStandardTask:
if worker.info.nodeInfo == nil {
continue
}
l.WithFields(log.Fields{
"worker-addr": worker.addr,
"worker-addr": worker.workerAddr,
}).Debugf("receive worker fetch std task request:%v", msg.FetchStandardTask.TaskType)
pushTask := standardlib.StdTask{}
task, exist := wm.std.GetTask(msg.FetchStandardTask.TaskType)
......@@ -453,64 +547,78 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
case *omanager.WorkerMessage_DeviceInfo:
l.WithFields(log.Fields{
"worker-addr": worker.addr,
"worker-addr": worker.workerAddr,
}).Debugf("receive worker device info:%v", msg.DeviceInfo)
if !worker.registed {
// ignore the info.
continue
}
{
// receive device info
worker.online = true
worker.publicKey = msg.DeviceInfo.MinerPubkey
worker.deviceInfo = msg.DeviceInfo
worker.benefitAddr = msg.DeviceInfo.BenefitAddress
var addr = ""
if pubkey, err := utils.HexToPubkey(worker.publicKey); err != nil {
var infoHash [32]byte
infoData, err := json.Marshal(msg.DeviceInfo)
if err != nil {
l.WithFields(log.Fields{
"worker-addr": worker.addr,
"worker-addr": worker.workerAddr,
"error": err,
}).Error("parse pubkey failed")
} else {
addr = utils.PubkeyToAddress(pubkey)
}).Error("marshal device info failed")
}
if addr == worker.addr {
// addr is not change.
if len(infoData) == 0 {
continue
}
if worker.addr != "" {
wm.InActiveWorker(worker)
infoHash = sha3.Sum256(infoData)
if worker.registed && worker.addFirstSucceed == false && len(worker.deviceInfoHash) == 0 {
wm.AddWorkerToQueue(worker)
}
worker.addr = addr
if worker.addr != "" {
infoData, err := json.Marshal(msg.DeviceInfo)
if err != nil {
l.WithFields(log.Fields{
"worker-addr": worker.addr,
"error": err,
}).Error("marshal device info failed")
} else if len(infoData) > 0 {
infoHash := sha3.Sum256(infoData)
// check device info changed, and update to cache.
if bytes.Compare(infoHash[:], worker.deviceInfoHash) != 0 {
wm.UpdateWorkerDeviceInfo(worker, string(infoData))
}
worker.deviceInfoHash = infoHash[:]
}
wm.AddWorkerFirst(worker)
wm.SetWorkerAddr(worker, worker.addr)
}
worker.info.deviceInfo = msg.DeviceInfo
}
case *omanager.WorkerMessage_DeviceUsage:
// todo: handler worker device usage
if !worker.registed {
continue
}
usageData, _ := json.Marshal(msg.DeviceUsage)
wm.UpdateWorkerDeviceInfo(worker, string(usageData))
worker.usageInfo = msg.DeviceUsage.Usage
worker.info.deviceUsageInfo = msg.DeviceUsage.Usage
l.WithFields(log.Fields{
"worker-addr": worker.addr,
"worker-addr": worker.workerAddr,
}).Debugf("receive worker device usage:%v", msg.DeviceUsage.Usage)
case *omanager.WorkerMessage_RegisteMessage:
if worker.registed {
continue
}
worker.registed = true
if pubkey, err := utils.HexToPubkey(msg.RegisteMessage.MinerPubkey); err != nil {
l.WithFields(log.Fields{
"worker-addr": worker.workerAddr,
"error": err,
}).Error("parse pubkey failed")
} else {
worker.workerAddr = utils.PubkeyToAddress(pubkey)
}
reg := registry.NewRegistry(registry.RedisConnParam{
Addr: config.GetConfig().Redis.Addr,
Password: config.GetConfig().Redis.Password,
DbIndex: config.GetConfig().Redis.DbIndex,
}, workerRegistry{
worker: worker,
wm: wm,
})
go reg.Start()
wm.SetWorkerRegistry(worker.uuid, reg)
default:
l.WithField("worker-addr", worker.addr).Error(fmt.Sprintf("unsupport msg type %T", msg))
l.WithField("worker-addr", worker.workerAddr).Error(fmt.Sprintf("unsupport msg type %T", msg))
}
}
}
......
......@@ -22,7 +22,7 @@ func (w workerRegistry) ServiceType() common.ServiceType {
}
func (w workerRegistry) Instance() string {
return fmt.Sprintf("%s", w.worker.addr)
return fmt.Sprintf("%s", w.worker.workerAddr)
}
func (w workerRegistry) Status() string {
......@@ -33,17 +33,18 @@ func (w workerRegistry) DetailInfo() (json.RawMessage, error) {
if w.worker == nil {
return nil, fmt.Errorf("worker is nil")
}
if w.worker.addr == "" {
if w.worker.workerAddr == "" {
return nil, fmt.Errorf("worker address is empty")
}
info := query.WorkerInfo{}
info.BenefitAddress = w.worker.benefitAddr
if w.worker.deviceInfo != nil {
info.IP = w.worker.deviceInfo.DeviceIps[0]
if w.worker.info.nodeInfo != nil {
info.BenefitAddress = w.worker.info.nodeInfo.BenefitAddress
info.IP = w.worker.info.nodeInfo.DeviceIp
}
info.ActiveNM, _ = w.wm.WorkerNmList(w.worker)
info.HearBeat = w.wm.GetHeartBeat(w.worker.uuid)
info.MinerAddress = w.worker.addr
info.MinerAddress = w.worker.workerAddr
return json.Marshal(info)
}
......@@ -3,12 +3,20 @@ package server
import (
"context"
"encoding/hex"
"errors"
"fmt"
"github.com/gomodule/redigo/redis"
"github.com/odysseus/nodemanager/config"
log "github.com/sirupsen/logrus"
"strconv"
"strings"
"time"
)
func (wm *WorkerManager) UpdateWorkerDeviceStatusInfo(worker *Worker, status []byte) {
wm.rdb.Set(context.Background(), workerDeviceStatusInfoKey(worker), status, 0)
}
func (wm *WorkerManager) UpdateWorkerUsageInfo(worker *Worker, usageInfo string) {
wm.rdb.Set(context.Background(), workerUsageInfoKey(worker), usageInfo, 0)
}
......@@ -22,24 +30,84 @@ func (wm *WorkerManager) UpdateWorkerResourceInfo(worker *Worker, resourceInfo [
wm.rdb.Set(context.Background(), workerResourceInfoKey(worker), rstr, 0)
}
func (wm *WorkerManager) UpdateWorkerNonce(worker *Worker, nonce int) error {
return wm.rdb.Set(context.Background(), workerNonceKey(worker), nonce, 0).Err()
}
func (wm *WorkerManager) GetWorkerNonce(worker *Worker) (int, error) {
if worker.workerAddr != "" {
nonceK := workerNonceKey(worker)
nonce, err := wm.rdb.Get(context.Background(), nonceK).Int()
if err == redis.ErrNil {
nonce = 1
if err = wm.rdb.Set(context.Background(), nonceK, nonce, 0).Err(); err != nil {
return 0, err
}
}
return nonce, nil
}
return 0, errors.New("unkown worker node info")
}
func (wm *WorkerManager) IncrWorkerNonce(worker *Worker) (int, error) {
nonce, err := wm.rdb.Incr(context.Background(), workerNonceKey(worker)).Uint64()
return int(nonce), err
}
func (wm *WorkerManager) AddWorkerFirst(worker *Worker) error {
log.WithField("worker", worker.addr).Info("add worker first time.")
for _, device := range worker.deviceInfo.Devices {
log.WithField("worker", worker.workerAddr).Info("add worker first time.")
wm.UpdateWorkerActive(worker)
for _, device := range worker.info.deviceInfo.Devices {
if !strings.HasPrefix(device.DeviceType, "gpu") {
continue
}
// add device to redis
priority := 0
_ = device // todo: set priority with device info.
// add worker to redis queue
if err := wm.rdb.RPush(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(priority), workerId(worker)).Err(); err != nil {
continue
}
}
// add worker to redis queue
wm.ActiveWorker(worker)
return nil
}
func (wm *WorkerManager) AddWorkerToQueue(worker *Worker) {
nonce, err := wm.GetWorkerNonce(worker)
if err != nil {
log.WithField("worker-addr", worker.workerAddr).Error("get worker nonce failed when get device info")
} else {
// if statekeys not exist, nonce don't change.
nmlist, err := wm.WorkerNmList(worker)
if err != nil {
if err == redis.ErrNil {
wm.UpdateWorkerActive(worker)
}
} else {
if len(nmlist) == 0 {
// if nmlist is empty, nonce incr.
nonce, err = wm.IncrWorkerNonce(worker)
if err != nil {
log.WithField("worker-addr", worker.workerAddr).Error("incr worker nonce failed when get device info")
}
} else {
// else if nmlist is not empty, clear and add self to it.
wm.rdb.Del(context.Background(), workerStatusKey(worker))
wm.UpdateWorkerActive(worker)
}
}
}
if err == nil {
worker.nonce = nonce
wm.AddWorkerFirst(worker)
worker.addFirstSucceed = true
}
}
func (wm *WorkerManager) AddWorkerSingle(worker *Worker) error {
log.WithField("worker", worker.addr).Info("add worker on back.")
log.WithField("worker", worker.workerAddr).Info("add worker on back.")
wm.UpdateWorkerActive(worker)
{
// add worker to redis queue
priority := 0
......@@ -48,12 +116,45 @@ func (wm *WorkerManager) AddWorkerSingle(worker *Worker) error {
}
}
// add worker to redis queue
wm.ActiveWorker(worker)
return nil
}
func (wm *WorkerManager) ActiveWorker(worker *Worker) {
wm.rdb.SAdd(context.Background(), workerStatusKey(worker), config.GetConfig().PublicEndpoint())
func (wm *WorkerManager) UpdateWorkerActive(worker *Worker) {
if !worker.online {
return
}
nonce, err := wm.GetWorkerNonce(worker)
if err != nil {
return
}
if nonce != worker.nonce {
wm.InActiveWorker(worker)
worker.nonce = nonce
}
old := worker.latestNmValue
if err := wm.activeWorker(worker); err != nil {
return
}
wm.rdb.SRem(context.Background(), workerStatusKey(worker), old)
}
func (wm *WorkerManager) activeWorker(worker *Worker) error {
split := "#"
v := fmt.Sprintf("%s%s%d", config.GetConfig().PublicEndpoint(), split, time.Now().Unix())
worker.latestNmValue = v
return wm.rdb.SAdd(context.Background(), workerStatusKey(worker), v).Err()
}
func (wm *WorkerManager) parseWorkerNmValue(nmValue string) (string, int64) {
split := "#"
strs := strings.Split(nmValue, split)
if len(strs) == 2 {
endpoint := strs[0]
timestamp, _ := strconv.ParseInt(strs[1], 10, 64)
return endpoint, timestamp
}
return "", 0
}
func (wm *WorkerManager) WorkerNmList(worker *Worker) ([]string, error) {
......@@ -61,7 +162,8 @@ func (wm *WorkerManager) WorkerNmList(worker *Worker) ([]string, error) {
}
func (wm *WorkerManager) InActiveWorker(worker *Worker) {
wm.rdb.SRem(context.Background(), workerStatusKey(worker), config.GetConfig().PublicEndpoint())
wm.rdb.SRem(context.Background(), workerStatusKey(worker), worker.latestNmValue)
if list, err := wm.rdb.SMembers(context.Background(), workerStatusKey(worker)).Result(); err == nil && len(list) == 0 {
wm.rdb.Del(context.Background(), workerStatusKey(worker))
wm.rdb.Del(context.Background(), workerUsageInfoKey(worker))
......@@ -71,15 +173,23 @@ func (wm *WorkerManager) InActiveWorker(worker *Worker) {
}
func workerResourceInfoKey(w *Worker) string {
return config.WORKER_RESOURCE_INFO_PREFIX + w.addr
return config.WORKER_RESOURCE_INFO_PREFIX + w.workerAddr
}
func workerDeviceInfoKey(w *Worker) string {
return config.WORKER_DEVICE_INFO_PREFIX + w.addr
return config.WORKER_DEVICE_INFO_PREFIX + w.workerAddr
}
func workerUsageInfoKey(w *Worker) string {
return config.WORKER_USAGE_INFO_PREFIX + w.addr
return config.WORKER_USAGE_INFO_PREFIX + w.workerAddr
}
func workerDeviceStatusInfoKey(w *Worker) string {
return config.WORKER_DEVICE_STATUS_PREFIX + w.workerAddr
}
func workerNonceKey(w *Worker) string {
return config.WORKER_NONCE_KEY_PREFIX + w.workerAddr
}
func workerStatusKey(w *Worker) string {
......@@ -88,5 +198,5 @@ func workerStatusKey(w *Worker) string {
}
func workerId(w *Worker) string {
return fmt.Sprintf("%s_%d", w.addr, w.uuid)
return fmt.Sprintf("%s_%d", w.workerAddr, w.nonce)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment