Commit 479a0359 authored by vicotor's avatar vicotor

update worker info

parent dd656801
package config package config
const ( const (
NODE_MANAGER_SET = "node_manager_set" NODE_MANAGER_SET = "node_manager_set"
WORKER_STATUS_PREFIX = "worker_status_" WORKER_STATUS_PREFIX = "worker_status_"
WORKER_QUEUE_PREFIX = "worker_queue_" WORKER_QUEUE_PREFIX = "worker_queue_"
WORKER_DEVICE_INFO_PREFIX = "worker_device_info_" WORKER_DEVICE_INFO_PREFIX = "worker_device_info_"
WORKER_USAGE_INFO_PREFIX = "worker_usage_info_" WORKER_USAGE_INFO_PREFIX = "worker_usage_info_"
WORKER_RESOURCE_INFO_PREFIX = "worker_resource_info_"
) )
...@@ -6,6 +6,7 @@ require ( ...@@ -6,6 +6,7 @@ require (
github.com/BurntSushi/toml v1.3.2 github.com/BurntSushi/toml v1.3.2
github.com/IBM/sarama v1.42.1 github.com/IBM/sarama v1.42.1
github.com/astaxie/beego v1.12.3 github.com/astaxie/beego v1.12.3
github.com/docker/docker v25.0.3+incompatible
github.com/ethereum/go-ethereum v1.13.13 github.com/ethereum/go-ethereum v1.13.13
github.com/gogo/protobuf v1.3.2 github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.3 github.com/golang/protobuf v1.5.3
......
...@@ -44,6 +44,8 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 h1:YLtO71vCjJRCBcrPMtQ9nqBsqpA1 ...@@ -44,6 +44,8 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 h1:YLtO71vCjJRCBcrPMtQ9nqBsqpA1
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1/go.mod h1:hyedUtir6IdtD/7lIxGeCxkaw7y45JueMRL4DIyJDKs= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1/go.mod h1:hyedUtir6IdtD/7lIxGeCxkaw7y45JueMRL4DIyJDKs=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/docker/docker v25.0.3+incompatible h1:D5fy/lYmY7bvZa0XTZ5/UJPljor41F+vdyJG5luQLfQ=
github.com/docker/docker v25.0.3+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/eapache/go-resiliency v1.4.0 h1:3OK9bWpPk5q6pbFAaYSEwD9CLUSHG8bnZuqX2yMt3B0= github.com/eapache/go-resiliency v1.4.0 h1:3OK9bWpPk5q6pbFAaYSEwD9CLUSHG8bnZuqX2yMt3B0=
github.com/eapache/go-resiliency v1.4.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= github.com/eapache/go-resiliency v1.4.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws=
......
...@@ -414,6 +414,8 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -414,6 +414,8 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
l.WithFields(log.Fields{ l.WithFields(log.Fields{
"worker-addr": worker.addr, "worker-addr": worker.addr,
}).Debugf("receive worker resource map:%v", msg.ResourceMap) }).Debugf("receive worker resource map:%v", msg.ResourceMap)
wm.UpdateWorkerResourceInfo(worker, msg.ResourceMap.ResourceMap)
case *omanager.WorkerMessage_FetchStandardTask: case *omanager.WorkerMessage_FetchStandardTask:
l.WithFields(log.Fields{ l.WithFields(log.Fields{
"worker-addr": worker.addr, "worker-addr": worker.addr,
...@@ -449,7 +451,6 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -449,7 +451,6 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
} }
case *omanager.WorkerMessage_DeviceInfo: case *omanager.WorkerMessage_DeviceInfo:
// todo: handler worker device info
l.WithFields(log.Fields{ l.WithFields(log.Fields{
"worker-addr": worker.addr, "worker-addr": worker.addr,
}).Debugf("receive worker device info:%v", msg.DeviceInfo) }).Debugf("receive worker device info:%v", msg.DeviceInfo)
...@@ -479,7 +480,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -479,7 +480,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
worker.addr = addr worker.addr = addr
if worker.addr != "" { if worker.addr != "" {
infoData, err := json.Marshal(msg.DeviceInfo.Devices) infoData, err := json.Marshal(msg.DeviceInfo)
if err != nil { if err != nil {
l.WithFields(log.Fields{ l.WithFields(log.Fields{
"worker-addr": worker.addr, "worker-addr": worker.addr,
...@@ -499,6 +500,9 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -499,6 +500,9 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
case *omanager.WorkerMessage_DeviceUsage: case *omanager.WorkerMessage_DeviceUsage:
// todo: handler worker device usage // todo: handler worker device usage
usageData, _ := json.Marshal(msg.DeviceUsage)
wm.UpdateWorkerDeviceInfo(worker, string(usageData))
worker.usageInfo = msg.DeviceUsage.Usage worker.usageInfo = msg.DeviceUsage.Usage
l.WithFields(log.Fields{ l.WithFields(log.Fields{
"worker-addr": worker.addr, "worker-addr": worker.addr,
......
...@@ -2,15 +2,24 @@ package server ...@@ -2,15 +2,24 @@ package server
import ( import (
"context" "context"
"encoding/hex"
"fmt" "fmt"
"github.com/odysseus/nodemanager/config" "github.com/odysseus/nodemanager/config"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"strconv" "strconv"
) )
func (wm *WorkerManager) UpdateWorkerUsageInfo(worker *Worker, usageInfo string) {
wm.rdb.Set(context.Background(), workerUsageInfoKey(worker), usageInfo, 0)
}
func (wm *WorkerManager) UpdateWorkerDeviceInfo(worker *Worker, deviceInfos string) { func (wm *WorkerManager) UpdateWorkerDeviceInfo(worker *Worker, deviceInfos string) {
deviceInfoKey := config.WORKER_DEVICE_INFO_PREFIX + worker.addr wm.rdb.Set(context.Background(), workerDeviceInfoKey(worker), deviceInfos, 0)
wm.rdb.Set(context.Background(), deviceInfoKey, deviceInfos, 0) }
func (wm *WorkerManager) UpdateWorkerResourceInfo(worker *Worker, resourceInfo []byte) {
rstr := hex.EncodeToString(resourceInfo)
wm.rdb.Set(context.Background(), workerResourceInfoKey(worker), rstr, 0)
} }
func (wm *WorkerManager) AddWorkerFirst(worker *Worker) error { func (wm *WorkerManager) AddWorkerFirst(worker *Worker) error {
...@@ -53,6 +62,22 @@ func (wm *WorkerManager) WorkerNmList(worker *Worker) ([]string, error) { ...@@ -53,6 +62,22 @@ func (wm *WorkerManager) WorkerNmList(worker *Worker) ([]string, error) {
func (wm *WorkerManager) InActiveWorker(worker *Worker) { func (wm *WorkerManager) InActiveWorker(worker *Worker) {
wm.rdb.SRem(context.Background(), workerStatusKey(worker), config.GetConfig().PublicEndpoint()) wm.rdb.SRem(context.Background(), workerStatusKey(worker), config.GetConfig().PublicEndpoint())
if list, err := wm.rdb.SMembers(context.Background(), workerStatusKey(worker)).Result(); err == nil && len(list) == 0 {
wm.rdb.Del(context.Background(), workerStatusKey(worker))
wm.rdb.Del(context.Background())
}
}
func workerResourceInfoKey(w *Worker) string {
return config.WORKER_RESOURCE_INFO_PREFIX + w.addr
}
func workerDeviceInfoKey(w *Worker) string {
return config.WORKER_DEVICE_INFO_PREFIX + w.addr
}
func workerUsageInfoKey(w *Worker) string {
return config.WORKER_USAGE_INFO_PREFIX + w.addr
} }
func workerStatusKey(w *Worker) string { func workerStatusKey(w *Worker) string {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment