Commit 802ac706 authored by vicotor's avatar vicotor

update worker

parent 41802e81
......@@ -5,7 +5,6 @@ import (
"errors"
"github.com/odysseus/nodemanager/utils"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"strconv"
)
var (
......@@ -45,11 +44,8 @@ func (n *NodeManagerService) DispatchTask(ctx context.Context, request *omanager
if request == nil {
return nil, errors.New("invalid request")
}
uuid, err := strconv.ParseInt(request.Miner, 10, 64)
if err != nil {
return nil, errors.New("not found worker")
}
worker := n.node.wm.GetWorker(uuid)
workerAddr := request.Miner
worker := n.node.wm.GetWorkerByAddr(workerAddr)
dtask := &dispatchTask{
task: request.TaskData,
errCh: make(chan error, 1),
......
......@@ -49,6 +49,7 @@ type WorkerManager struct {
hbRwLock sync.RWMutex
workers map[int64]*Worker
workid map[string]*Worker
wkRwLock sync.RWMutex
quit chan struct{}
......@@ -61,6 +62,7 @@ func NewWorkerManager(rdb *redis.Client, node *Node) *WorkerManager {
return &WorkerManager{
heartBeat: make(map[int64]int64),
workers: make(map[int64]*Worker),
workid: make(map[string]*Worker),
quit: make(chan struct{}),
rdb: rdb,
node: node,
......@@ -95,6 +97,21 @@ func (wm *WorkerManager) GetWorker(uuid int64) *Worker {
return wm.workers[uuid]
}
func (wm *WorkerManager) SetWorkerAddr(worker *Worker, addr string) {
wm.wkRwLock.Lock()
defer wm.wkRwLock.Unlock()
worker.addr = addr
wm.workid[addr] = worker
}
func (wm *WorkerManager) GetWorkerByAddr(addr string) *Worker {
wm.wkRwLock.RLock()
defer wm.wkRwLock.RUnlock()
return wm.workid[addr]
}
func (wm *WorkerManager) AddNewWorker(uuid int64, worker omanager.NodeManagerService_RegisterWorkerServer) (*Worker, error) {
wm.wkRwLock.Lock()
defer wm.wkRwLock.Unlock()
......@@ -278,10 +295,12 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
worker.recentTask.Remove(result.TaskId)
}
taskResponse := &odysseus.TaskResponse{
TaskId: task.TaskId,
TaskResult: result.TaskResult,
TaskUid: task.TaskUid,
TaskFee: task.TaskFee,
TaskId: task.TaskId,
TaskResult: result.TaskResult,
TaskUid: task.TaskUid,
TaskFee: task.TaskFee,
TaskIsSucceed: true,
TaskError: "",
}
d, _ := proto.Marshal(taskResponse)
go func() {
......@@ -341,6 +360,7 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
}).Debug("receive worker heartbeat")
case *omanager.WorkerMessage_Status:
// todo: store worker status
worker.status = msg.Status.DeviceStatus
log.WithFields(log.Fields{
"worker": worker.uuid,
}).Debugf("receive worker status:0x%x", msg.Status.DeviceStatus)
......@@ -363,11 +383,13 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
}
if worker.addr != "" {
wm.AddWorker(worker)
wm.SetWorkerAddr(worker, worker.addr)
}
}
case *omanager.WorkerMessage_DeviceUsage:
// todo: handler worker device usage
worker.usageInfo = msg.DeviceUsage.Usage
log.WithFields(log.Fields{
"worker": worker.uuid,
}).Debugf("receive worker device usage:%v", msg.DeviceUsage.Usage)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment