Commit 36c11520 authored by vicotor's avatar vicotor

add resource check in popworker

parent 9778efce
package config
const (
NODE_MANAGER_SET = "node_manager_set"
WORKER_STATUS_PREFIX = "worker_status_"
WORKER_QUEUE_PREFIX = "worker_queue_"
NODE_MANAGER_SET = "node_manager_set"
WORKER_STATUS_PREFIX = "worker_status_"
WORKER_QUEUE_PREFIX = "worker_queue_"
WORKER_RESOURCE_INFO_PREFIX = "worker_resource_info_"
)
......@@ -6,6 +6,7 @@ require (
github.com/BurntSushi/toml v1.3.2
github.com/IBM/sarama v1.42.1
github.com/astaxie/beego v1.12.3
github.com/docker/docker v25.0.4+incompatible
github.com/ethereum/go-ethereum v1.13.13
github.com/gogo/protobuf v1.3.2
github.com/google/uuid v1.5.0
......
......@@ -44,6 +44,8 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 h1:YLtO71vCjJRCBcrPMtQ9nqBsqpA1
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1/go.mod h1:hyedUtir6IdtD/7lIxGeCxkaw7y45JueMRL4DIyJDKs=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/docker/docker v25.0.4+incompatible h1:XITZTrq+52tZyZxUOtFIahUf3aH367FLxJzt9vZeAF8=
github.com/docker/docker v25.0.4+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/eapache/go-resiliency v1.4.0 h1:3OK9bWpPk5q6pbFAaYSEwD9CLUSHG8bnZuqX2yMt3B0=
github.com/eapache/go-resiliency v1.4.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws=
......
......@@ -229,7 +229,7 @@ func (n *Node) Loop(idx int) {
l := log.WithField("task-id", task.TaskId)
l.WithField("task", task).Info("get task")
for {
worker, err := PopWorker(ctx, n.rdb)
worker, err := n.PopWorker(ctx, n.rdb, t)
if err == ErrNoWorker || err == ErrTimeout {
result := &odysseus.TaskResponse{
TaskId: task.TaskId,
......@@ -351,3 +351,50 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
}
}
}
func (n *Node) addWorkerBack(w Worker) {
n.rdb.LPush(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(w.priority), workerId(w))
log.WithField("worker", w.workerid).Debug("add worker back to queue")
}
func (n *Node) PopWorker(ctx context.Context, rdb *redis.Client, task *odysseus.TaskContent) (Worker, error) {
for i := 0; i < maxPriority; i++ {
for {
if ctx.Err() != nil {
return Worker{}, ErrTimeout
}
elem, err := rdb.LPop(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(i)).Result()
if err != nil {
log.WithError(err).Error("lPop worker failed")
break
}
log.WithField("elem", elem).Debug("lPop worker")
addr, nonce := parseWorkerId(elem)
managerList, err := rdb.SMembers(context.Background(), workerStatusKey(elem)).Result()
if err != nil {
log.WithError(err).Error("get worker status failed")
continue
}
log.WithField("managerList", managerList).Debug("get worker status")
if len(managerList) == 0 {
continue
}
worker := Worker{
workerid: elem,
addr: addr,
nonce: nonce,
priority: i,
managers: managerList,
}
if !checkWorkerHasResource(rdb, worker.addr, task.TaskType) {
n.addWorkerBack(worker)
continue
}
return worker, nil
}
}
return Worker{}, ErrNoWorker
}
......@@ -2,8 +2,10 @@ package server
import (
"context"
"encoding/hex"
"errors"
"fmt"
"github.com/docker/docker/libnetwork/bitmap"
odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"github.com/odysseus/scheduler/config"
......@@ -34,42 +36,25 @@ type Worker struct {
managers []string
}
func PopWorker(ctx context.Context, rdb *redis.Client) (Worker, error) {
for i := 0; i < maxPriority; i++ {
for {
if ctx.Err() != nil {
return Worker{}, ErrTimeout
}
func checkWorkerHasResource(rdb *redis.Client, addr string, resource uint64) bool {
k := workerResourceInfoKey(addr)
rstr, err := rdb.Get(context.Background(), k).Result()
if err != nil {
return false
}
data, _ := hex.DecodeString(rstr)
b := bitmap.New(100000)
if err := b.UnmarshalBinary(data); err != nil {
return false
}
return b.IsSet(resource)
elem, err := rdb.LPop(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(i)).Result()
if err != nil {
log.WithError(err).Error("lPop worker failed")
break
}
log.WithField("elem", elem).Debug("lPop worker")
addr, nonce := parseWorkerId(elem)
managerList, err := rdb.SMembers(context.Background(), workerStatusKey(elem)).Result()
if err != nil {
log.WithError(err).Error("get worker status failed")
continue
}
log.WithField("managerList", managerList).Debug("get worker status")
if len(managerList) == 0 {
continue
}
return Worker{
workerid: elem,
addr: addr,
nonce: nonce,
priority: i,
managers: managerList,
}, nil
}
}
}
return Worker{}, ErrNoWorker
func workerResourceInfoKey(addr string) string {
return config.WORKER_RESOURCE_INFO_PREFIX + addr
}
func workerStatusKey(wid string) string {
return fmt.Sprintf("%s_%s", config.WORKER_STATUS_PREFIX, wid)
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment