Commit f2d55c8b authored by vicotor's avatar vicotor

fix bug

parent 231eb8ee
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"errors" "errors"
"github.com/odysseus/nodemanager/utils" "github.com/odysseus/nodemanager/utils"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1" omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"strings"
) )
var ( var (
...@@ -45,7 +46,11 @@ func (n *NodeManagerService) DispatchTask(ctx context.Context, request *omanager ...@@ -45,7 +46,11 @@ func (n *NodeManagerService) DispatchTask(ctx context.Context, request *omanager
if request == nil { if request == nil {
return nil, errors.New("invalid request") return nil, errors.New("invalid request")
} }
workerAddr := request.Miner mids := strings.Split(request.Miner, "_")
if len(mids) != 2 {
return nil, errors.New("invalid miner")
}
workerAddr := mids[0]
worker := n.node.wm.GetWorkerByAddr(workerAddr) worker := n.node.wm.GetWorkerByAddr(workerAddr)
if worker == nil { if worker == nil {
return nil, errors.New("worker not found") return nil, errors.New("worker not found")
...@@ -67,6 +72,6 @@ func (n *NodeManagerService) DispatchTask(ctx context.Context, request *omanager ...@@ -67,6 +72,6 @@ func (n *NodeManagerService) DispatchTask(ctx context.Context, request *omanager
res := new(omanager.DispatchTaskResponse) res := new(omanager.DispatchTaskResponse)
res.TaskId = request.TaskData.TaskId res.TaskId = request.TaskData.TaskId
res.Miner = request.Miner res.Miner = workerAddr
return res, nil return res, nil
} }
...@@ -201,7 +201,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error { ...@@ -201,7 +201,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
}).Info("exit manage worker") }).Info("exit manage worker")
worker.online = false worker.online = false
wm.InActiveWorker(worker.addr) wm.InActiveWorker(worker)
}() }()
for { for {
...@@ -230,7 +230,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error { ...@@ -230,7 +230,7 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
deviceUsageTicker.Reset(time.Second * time.Duration(tickerConf.DeviceUsageTicker)) deviceUsageTicker.Reset(time.Second * time.Duration(tickerConf.DeviceUsageTicker))
} }
if time.Now().Unix()-wm.GetHeartBeat(worker.uuid) > int64(workerCheckDuration.Seconds()) { if time.Now().Unix()-wm.GetHeartBeat(worker.uuid) > int64(workerCheckDuration.Seconds()) {
wm.InActiveWorker(worker.addr) wm.InActiveWorker(worker)
// todo: remove worker // todo: remove worker
return ErrHeartBeatExpired return ErrHeartBeatExpired
} }
...@@ -326,9 +326,8 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error { ...@@ -326,9 +326,8 @@ func (wm *WorkerManager) manageWorker(worker *Worker) error {
callback = func(err error) bool { callback = func(err error) bool {
// remove task from cache. // remove task from cache.
worker.recentTask.Remove(result.TaskId) worker.recentTask.Remove(result.TaskId)
_ = wm.AddWorkerSingle(worker)
if task.TaskKind != odysseus.TaskKind_StandardTask { if task.TaskKind != odysseus.TaskKind_StandardTask {
_ = wm.AddWorkerSingle(worker)
wm.Payment(task) wm.Payment(task)
} }
return true return true
...@@ -447,6 +446,11 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -447,6 +446,11 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
// addr is not change. // addr is not change.
continue continue
} }
if worker.addr != "" {
wm.InActiveWorker(worker)
}
worker.addr = addr worker.addr = addr
if worker.addr != "" { if worker.addr != "" {
infoData, err := json.Marshal(msg.DeviceInfo.Devices) infoData, err := json.Marshal(msg.DeviceInfo.Devices)
...@@ -462,7 +466,6 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) { ...@@ -462,7 +466,6 @@ func (wm *WorkerManager) handleWorkerMsg(worker *Worker) {
} }
worker.deviceInfoHash = infoHash[:] worker.deviceInfoHash = infoHash[:]
} }
wm.InActiveWorker(addr)
wm.AddWorkerFirst(worker) wm.AddWorkerFirst(worker)
wm.SetWorkerAddr(worker, worker.addr) wm.SetWorkerAddr(worker, worker.addr)
} }
......
...@@ -2,6 +2,7 @@ package server ...@@ -2,6 +2,7 @@ package server
import ( import (
"context" "context"
"fmt"
"github.com/odysseus/nodemanager/config" "github.com/odysseus/nodemanager/config"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"strconv" "strconv"
...@@ -10,7 +11,6 @@ import ( ...@@ -10,7 +11,6 @@ import (
func (wm *WorkerManager) UpdateWorkerDeviceInfo(worker *Worker, deviceInfos string) { func (wm *WorkerManager) UpdateWorkerDeviceInfo(worker *Worker, deviceInfos string) {
deviceInfoKey := config.WORKER_DEVICE_INFO_PREFIX + worker.addr deviceInfoKey := config.WORKER_DEVICE_INFO_PREFIX + worker.addr
wm.rdb.Set(context.Background(), deviceInfoKey, deviceInfos, 0) wm.rdb.Set(context.Background(), deviceInfoKey, deviceInfos, 0)
} }
func (wm *WorkerManager) AddWorkerFirst(worker *Worker) error { func (wm *WorkerManager) AddWorkerFirst(worker *Worker) error {
...@@ -19,37 +19,43 @@ func (wm *WorkerManager) AddWorkerFirst(worker *Worker) error { ...@@ -19,37 +19,43 @@ func (wm *WorkerManager) AddWorkerFirst(worker *Worker) error {
// add device to redis // add device to redis
priority := 0 priority := 0
_ = device // todo: set priority with device info. _ = device // todo: set priority with device info.
if err := wm.rdb.RPush(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(priority), worker.addr).Err(); err != nil { if err := wm.rdb.RPush(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(priority), workerId(worker)).Err(); err != nil {
continue continue
} }
} }
// add worker to redis queue // add worker to redis queue
if err := wm.rdb.SAdd(context.Background(), config.WORKER_STATUS_PREFIX+worker.addr, config.GetConfig().PublicEndpoint()).Err(); err != nil { wm.ActiveWorker(worker)
return err
}
return nil return nil
} }
func (wm *WorkerManager) AddWorkerSingle(worker *Worker) error { func (wm *WorkerManager) AddWorkerSingle(worker *Worker) error {
log.WithField("worker", worker.addr).Info("add worker on back.") log.WithField("worker", worker.addr).Info("add worker on back.")
{ {
// add worker to redis // add worker to redis queue
priority := 0 priority := 0
if err := wm.rdb.RPush(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(priority), worker.addr).Err(); err != nil { if err := wm.rdb.RPush(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(priority), workerId(worker)).Err(); err != nil {
log.WithError(err).Error("add worker back to queue failed.") log.WithError(err).Error("add worker back to queue failed.")
} }
} }
// add worker to redis queue // add worker to redis queue
if err := wm.rdb.SAdd(context.Background(), config.WORKER_STATUS_PREFIX+worker.addr, config.GetConfig().PublicEndpoint()).Err(); err != nil { wm.ActiveWorker(worker)
return err
}
return nil return nil
} }
func (wm *WorkerManager) ActiveWorker(worker *Worker) { func (wm *WorkerManager) ActiveWorker(worker *Worker) {
wm.rdb.SAdd(context.Background(), config.WORKER_STATUS_PREFIX+worker.addr, config.GetConfig().PublicEndpoint()) wm.rdb.SAdd(context.Background(), workerStatusKey(worker), config.GetConfig().PublicEndpoint())
}
func (wm *WorkerManager) InActiveWorker(worker *Worker) {
wm.rdb.SRem(context.Background(), workerStatusKey(worker), config.GetConfig().PublicEndpoint())
}
func workerStatusKey(w *Worker) string {
id := workerId(w)
return fmt.Sprintf("%s_%s", config.WORKER_STATUS_PREFIX, id)
} }
func (wm *WorkerManager) InActiveWorker(addr string) { func workerId(w *Worker) string {
wm.rdb.SRem(context.Background(), config.WORKER_STATUS_PREFIX+addr, config.GetConfig().PublicEndpoint()) return fmt.Sprintf("%s_%d", w.addr, w.uuid)
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment