Commit 4b3eff99 authored by vicotor's avatar vicotor

add scheduler v2

parent ddd03ae2
metrics_port = 28012 metrics_port = 28012
routines = 1 routines = 1
max_nm_update_ex = 40 max_nm_update_ex = 40
dispatch_timeout = 30 dispatch_timeout = 3
[mongodb]
url="mongodb://127.0.0.1:27017"
database="ai"
user="admin"
password="admin"
[redis] [redis]
addr="127.0.0.1:6379" addr="127.0.0.1:6379"
...@@ -9,15 +15,18 @@ password="123456" ...@@ -9,15 +15,18 @@ password="123456"
db=0 db=0
[mysql] [mysql]
host="192.168.1.211" #host="192.168.1.211"
#port=3306
#user="root"
#password="12345678"
#database="liuxuzhong"
host="18.167.203.17"
port=3306 port=3306
user="root" user="ai"
password="12345678" password="RFnnKHRar5xk7TEF"
database="liuxuzhong" database="ai"
[kafka] [kafka]
#brokers="192.168.1.108:9092"
#brokers="192.168.1.220:9092"
brokers="127.0.0.1:9092" brokers="127.0.0.1:9092"
task_topic="pbaigc" task_topic="pbaigc"
receipt_topic="taskreceipt" receipt_topic="taskreceipt"
...@@ -25,14 +25,22 @@ type MysqlConfig struct { ...@@ -25,14 +25,22 @@ type MysqlConfig struct {
DbName string `json:"database" toml:"database"` DbName string `json:"database" toml:"database"`
} }
type MongoDbConfig struct {
Url string `json:"url" toml:"url"`
Database string `json:"database" toml:"database"`
User string `json:"user" toml:"user"`
Passwd string `json:"password" toml:"password"`
}
type Config struct { type Config struct {
MetricPort int `json:"metrics_port" toml:"metrics_port"` MetricPort int `json:"metrics_port" toml:"metrics_port"`
Routines int `json:"routines" toml:"routines"` Routines int `json:"routines" toml:"routines"`
MaxNmUpdateEx int `json:"max_nm_update_ex" toml:"max_nm_update_ex"` MaxNmUpdateEx int `json:"max_nm_update_ex" toml:"max_nm_update_ex"`
DispatchTimeout int `json:"dispatch_timeout" toml:"dispatch_timeout"` DispatchTimeout int `json:"dispatch_timeout" toml:"dispatch_timeout"`
Redis RedisConfig `json:"redis" toml:"redis"` Redis RedisConfig `json:"redis" toml:"redis"`
Kafka KafkaConfig `json:"kafka" toml:"kafka"` Kafka KafkaConfig `json:"kafka" toml:"kafka"`
DbConfig MysqlConfig `json:"mysql" toml:"mysql"` DbConfig MysqlConfig `json:"mysql" toml:"mysql"`
Mongo MongoDbConfig `json:"mongodb" toml:"mongodb"`
} }
var _cfg *Config = nil var _cfg *Config = nil
......
module github.com/odysseus/scheduler module github.com/odysseus/scheduler
go 1.20 go 1.21
toolchain go1.21.5
require ( require (
github.com/BurntSushi/toml v1.3.2 github.com/BurntSushi/toml v1.3.2
github.com/IBM/sarama v1.42.1 github.com/IBM/sarama v1.42.1
github.com/astaxie/beego v1.12.3
github.com/docker/docker v25.0.4+incompatible github.com/docker/docker v25.0.4+incompatible
github.com/ethereum/go-ethereum v1.13.13 github.com/ethereum/go-ethereum v1.13.13
github.com/gogo/protobuf v1.3.2 github.com/gogo/protobuf v1.3.2
github.com/google/uuid v1.5.0 github.com/google/uuid v1.6.0
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible
github.com/odysseus/cache v0.0.0-00010101000000-000000000000 github.com/odysseus/cache v0.0.0-00010101000000-000000000000
github.com/odysseus/mogo v0.0.0-00010101000000-000000000000
github.com/odysseus/odysseus-protocol v0.0.0-00010101000000-000000000000 github.com/odysseus/odysseus-protocol v0.0.0-00010101000000-000000000000
github.com/odysseus/service-registry v0.0.0-00010101000000-000000000000 github.com/odysseus/service-registry v0.0.0-00010101000000-000000000000
github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_golang v1.18.0
...@@ -20,10 +22,12 @@ require ( ...@@ -20,10 +22,12 @@ require (
github.com/sirupsen/logrus v1.9.3 github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.8.0 github.com/spf13/cobra v1.8.0
github.com/spf13/viper v1.18.2 github.com/spf13/viper v1.18.2
go.mongodb.org/mongo-driver v1.15.0
google.golang.org/grpc v1.60.1 google.golang.org/grpc v1.60.1
) )
require ( require (
github.com/astaxie/beego v1.12.3 // indirect
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect
...@@ -56,6 +60,7 @@ require ( ...@@ -56,6 +60,7 @@ require (
github.com/magiconair/properties v1.8.7 // indirect github.com/magiconair/properties v1.8.7 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
...@@ -70,17 +75,21 @@ require ( ...@@ -70,17 +75,21 @@ require (
github.com/spf13/cast v1.6.0 // indirect github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.6.0 // indirect github.com/subosito/gotenv v1.6.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
go.uber.org/atomic v1.9.0 // indirect go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect go.uber.org/multierr v1.9.0 // indirect
golang.org/x/crypto v0.17.0 // indirect golang.org/x/crypto v0.17.0 // indirect
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect
golang.org/x/net v0.19.0 // indirect golang.org/x/net v0.19.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sys v0.16.0 // indirect golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f // indirect
google.golang.org/protobuf v1.32.0 // indirect google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
) )
...@@ -89,3 +98,5 @@ replace github.com/odysseus/odysseus-protocol => ../odysseus-protocol ...@@ -89,3 +98,5 @@ replace github.com/odysseus/odysseus-protocol => ../odysseus-protocol
replace github.com/odysseus/cache => ../cache replace github.com/odysseus/cache => ../cache
replace github.com/odysseus/service-registry => ../service-registry replace github.com/odysseus/service-registry => ../service-registry
replace github.com/odysseus/mogo => ../mogo
This diff is collapsed.
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1" odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
"github.com/odysseus/scheduler/config" "github.com/odysseus/scheduler/config"
"github.com/odysseus/scheduler/utils" "github.com/odysseus/scheduler/utils"
"github.com/odysseus/scheduler/workerpoper"
"github.com/odysseus/service-registry/common" "github.com/odysseus/service-registry/common"
"github.com/odysseus/service-registry/query" "github.com/odysseus/service-registry/query"
"github.com/odysseus/service-registry/registry" "github.com/odysseus/service-registry/registry"
...@@ -31,8 +32,9 @@ type Node struct { ...@@ -31,8 +32,9 @@ type Node struct {
wg sync.WaitGroup wg sync.WaitGroup
status string status string
reg *registry.Registry reg *registry.Registry
receiptCh chan *odysseus.TaskReceipt
receiptCh chan *odysseus.TaskReceipt poper workerpoper.WorkerPoper
} }
func (n *Node) ServiceType() common.ServiceType { func (n *Node) ServiceType() common.ServiceType {
...@@ -67,12 +69,18 @@ func NewNode() *Node { ...@@ -67,12 +69,18 @@ func NewNode() *Node {
Passwd: dbconf.Passwd, Passwd: dbconf.Passwd,
DbName: dbconf.DbName, DbName: dbconf.DbName,
}) })
poper := workerpoper.NewPopWorker()
if poper == nil {
panic("failed to create poper")
}
node := &Node{ node := &Node{
cache: pay, cache: pay,
rdb: rdb, rdb: rdb,
quit: make(chan struct{}), quit: make(chan struct{}),
conf: config.GetConfig(), conf: config.GetConfig(),
receiptCh: make(chan *odysseus.TaskReceipt, 100000), receiptCh: make(chan *odysseus.TaskReceipt, 100000),
poper: poper,
} }
node.status = "before running" node.status = "before running"
node.register() node.register()
...@@ -228,8 +236,9 @@ func (n *Node) Loop(idx int) { ...@@ -228,8 +236,9 @@ func (n *Node) Loop(idx int) {
go func(ctx context.Context, task *odysseus.TaskContent) { go func(ctx context.Context, task *odysseus.TaskContent) {
l := log.WithField("task-id", task.TaskId) l := log.WithField("task-id", task.TaskId)
l.WithField("task", task).Info("get task") l.WithField("task", task).Info("get task")
failed := make(map[string]bool)
for { for {
worker, err := n.PopWorker(ctx, n.rdb, t) worker, err := n.poper.PopWorker(ctx, n.rdb, t, failed)
if err == ErrNoWorker || err == ErrTimeout { if err == ErrNoWorker || err == ErrTimeout {
result := &odysseus.TaskResponse{ result := &odysseus.TaskResponse{
TaskId: task.TaskId, TaskId: task.TaskId,
...@@ -255,6 +264,7 @@ func (n *Node) Loop(idx int) { ...@@ -255,6 +264,7 @@ func (n *Node) Loop(idx int) {
err = n.DispatchTask(ctx, worker, task) err = n.DispatchTask(ctx, worker, task)
if err != nil { if err != nil {
l.WithError(err).Error("dispatch task failed") l.WithError(err).Error("dispatch task failed")
failed[worker.Workerid] = true
continue continue
} else { } else {
l.Info("dispatch task success") l.Info("dispatch task success")
...@@ -351,64 +361,3 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram ...@@ -351,64 +361,3 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
} }
} }
} }
func (n *Node) addWorkerBack(w Worker) {
n.rdb.RPush(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(w.priority), w.workerid)
log.WithField("worker", w.workerid).Debug("add worker back to queue")
}
func (n *Node) PopWorker(ctx context.Context, rdb *redis.Client, task *odysseus.TaskContent) (Worker, error) {
var checkedWorker = make(map[string]bool)
for i := 0; i < maxPriority; i++ {
for {
if ctx.Err() != nil {
return Worker{}, ErrTimeout
}
elem, err := rdb.LPop(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(i)).Result()
if err != nil {
log.WithError(err).Error("lPop worker failed")
break
}
log.WithField("elem", elem).Debug("lPop worker")
addr, nonce := parseWorkerId(elem)
log.WithFields(log.Fields{
"addr": addr,
"nonce": nonce,
}).Debug("parsed worker")
managerList, err := rdb.SMembers(context.Background(), workerStatusKey(addr, nonce)).Result()
if err != nil {
log.WithError(err).Error("get worker status failed")
continue
}
log.WithFields(log.Fields{
"managerList": managerList,
"statuskey": workerStatusKey(addr, nonce),
}).Debug("get worker status")
if len(managerList) == 0 {
continue
}
worker := Worker{
workerid: elem,
addr: addr,
nonce: nonce,
priority: i,
managers: managerList,
}
if false {
if !checkWorkerHasResource(rdb, worker.addr, task.TaskType) {
n.addWorkerBack(worker)
if checked := checkedWorker[worker.workerid]; checked {
break
} else {
checkedWorker[worker.workerid] = true
continue
}
}
}
return worker, nil
}
}
return Worker{}, ErrNoWorker
}
...@@ -2,14 +2,11 @@ package server ...@@ -2,14 +2,11 @@ package server
import ( import (
"context" "context"
"encoding/hex"
"errors" "errors"
"fmt"
"github.com/docker/docker/libnetwork/bitmap"
odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1" odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1" omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"github.com/odysseus/scheduler/config" "github.com/odysseus/scheduler/config"
redis "github.com/redis/go-redis/v9" "github.com/odysseus/scheduler/types"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
...@@ -18,51 +15,12 @@ import ( ...@@ -18,51 +15,12 @@ import (
"time" "time"
) )
var (
maxPriority = 2 // total priority for worker queue
)
var ( var (
ErrNoWorker = errors.New("no worker") ErrNoWorker = errors.New("no worker")
ErrTimeout = errors.New("schedule timeout") ErrTimeout = errors.New("schedule timeout")
ErrDispatchFailed = errors.New("dispatch to nodemanager failed") ErrDispatchFailed = errors.New("dispatch to nodemanager failed")
) )
type Worker struct {
workerid string
addr string
nonce int64
priority int
managers []string
}
func checkWorkerHasResource(rdb *redis.Client, addr string, resource uint64) bool {
k := workerResourceInfoKey(addr)
rstr, err := rdb.Get(context.Background(), k).Result()
if err != nil {
return false
}
data, _ := hex.DecodeString(rstr)
b := bitmap.New(100000)
if err := b.UnmarshalBinary(data); err != nil {
return false
}
return b.IsSet(resource)
}
func workerResourceInfoKey(addr string) string {
return config.WORKER_RESOURCE_INFO_PREFIX + addr
}
func workerStatusKey(addr string, nonce int64) string {
id := workerId(addr, nonce)
return fmt.Sprintf("%s_%s", config.WORKER_STATUS_PREFIX, id)
}
func workerId(addr string, nonce int64) string {
return fmt.Sprintf("%s_%d", addr, nonce)
}
func newManagerClient(endpoint string) (omanager.NodeManagerServiceClient, error) { func newManagerClient(endpoint string) (omanager.NodeManagerServiceClient, error) {
client, err := grpc.Dial(endpoint, client, err := grpc.Dial(endpoint,
grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithTransportCredentials(insecure.NewCredentials()),
...@@ -78,18 +36,6 @@ func newManagerClient(endpoint string) (omanager.NodeManagerServiceClient, error ...@@ -78,18 +36,6 @@ func newManagerClient(endpoint string) (omanager.NodeManagerServiceClient, error
} }
func parseWorkerId(elem string) (string, int64) {
split := "_"
strs := strings.Split(elem, split)
if len(strs) == 2 {
addr := strs[0]
nonceds := strings.Split(strs[1], ":")
nonce, _ := strconv.ParseInt(nonceds[0], 10, 64)
return addr, nonce
}
return "", 0
}
func parseWorkerNmValue(nmValue string) (string, int64) { func parseWorkerNmValue(nmValue string) (string, int64) {
split := "#" split := "#"
strs := strings.Split(nmValue, split) strs := strings.Split(nmValue, split)
...@@ -101,22 +47,20 @@ func parseWorkerNmValue(nmValue string) (string, int64) { ...@@ -101,22 +47,20 @@ func parseWorkerNmValue(nmValue string) (string, int64) {
return "", 0 return "", 0
} }
func (n *Node) DispatchTask(ctx context.Context, worker Worker, task *odysseus.TaskContent) error { func (n *Node) DispatchTask(ctx context.Context, worker types.Worker, task *odysseus.TaskContent) error {
l := log.WithField("task-id", task.TaskId) l := log.WithField("task-id", task.TaskId)
l.WithFields(log.Fields{ l.WithFields(log.Fields{
"worker": worker.workerid, "worker": worker.Workerid,
"managerList": worker.managers, "managerList": worker.Managers,
}).Debug("dispatch task to worker") }).Debug("dispatch task to worker")
var shouldAddBack = false var shouldAddBack = false
defer func(w Worker) { defer func(w types.Worker) {
if shouldAddBack { if shouldAddBack && n.poper.CanAddBack() {
// add worker back to redis queue. n.poper.AddBack(w)
n.rdb.LPush(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(w.priority), w.workerid)
l.WithField("worker", worker.workerid).Debug("add worker back to queue")
} }
}(worker) }(worker)
for _, manager := range worker.managers { for _, manager := range worker.Managers {
endpoint, updateTime := parseWorkerNmValue(manager) endpoint, updateTime := parseWorkerNmValue(manager)
if time.Now().Unix()-updateTime > int64(config.GetConfig().MaxNmUpdateEx) { if time.Now().Unix()-updateTime > int64(config.GetConfig().MaxNmUpdateEx) {
l.WithField("manager", manager).Warn("ignore the manager because of not update tm") l.WithField("manager", manager).Warn("ignore the manager because of not update tm")
...@@ -132,7 +76,7 @@ func (n *Node) DispatchTask(ctx context.Context, worker Worker, task *odysseus.T ...@@ -132,7 +76,7 @@ func (n *Node) DispatchTask(ctx context.Context, worker Worker, task *odysseus.T
continue continue
} }
_, err = client.DispatchTask(ctx, &omanager.DispatchTaskRequest{ _, err = client.DispatchTask(ctx, &omanager.DispatchTaskRequest{
Miner: worker.workerid, Miner: worker.Workerid,
TaskData: task, TaskData: task,
}) })
if err != nil { if err != nil {
......
package types
type Worker struct {
Workerid string
Addr string
Nonce int64
Priority int
Managers []string
}
package utils
import (
"context"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"time"
)
func ConnectMongoDB(uri string, username, passwd string) (*mongo.Client, error) {
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri).SetAuth(options.Credential{
Username: username,
Password: passwd,
}))
if err != nil {
return nil, err
}
return client, nil
}
package workerpoper
import (
"context"
"encoding/hex"
"fmt"
"github.com/docker/docker/libnetwork/bitmap"
odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
"github.com/odysseus/scheduler/config"
"github.com/odysseus/scheduler/types"
"github.com/odysseus/scheduler/utils"
"github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"
"strconv"
"strings"
)
var (
maxPriority = 2 // total priority for worker queue
)
type poperV1 struct {
rdb *redis.Client
}
func newPoperV1() (*poperV1, error) {
redisConfig := config.GetConfig().Redis
rdb := utils.NewRedisClient(utils.RedisConnParam{
Addr: redisConfig.Addr,
Password: redisConfig.Password,
DbIndex: redisConfig.DbIndex,
})
return &poperV1{rdb: rdb}, nil
}
func (p *poperV1) CanAddBack() bool {
return true
}
func (p *poperV1) AddBack(w types.Worker) {
p.addWorkerBack(w)
}
func (p *poperV1) addWorkerBack(w types.Worker) {
p.rdb.RPush(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(w.Priority), w.Workerid)
log.WithField("worker", w.Workerid).Debug("add worker back to queue")
}
func (p *poperV1) PopWorker(ctx context.Context, rdb *redis.Client, task *odysseus.TaskContent, ex map[string]bool) (types.Worker, error) {
// PopWorker implementation
var checkedWorker = make(map[string]bool)
for i := 0; i < maxPriority; i++ {
for {
if ctx.Err() != nil {
return types.Worker{}, ErrTimeout
}
elem, err := rdb.LPop(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(i)).Result()
if err != nil {
log.WithError(err).Error("lPop worker failed")
break
}
log.WithField("elem", elem).Debug("lPop worker")
addr, nonce := parseWorkerId(elem)
log.WithFields(log.Fields{
"addr": addr,
"nonce": nonce,
}).Debug("parsed worker")
managerList, err := rdb.SMembers(context.Background(), workerStatusKey(addr, nonce)).Result()
if err != nil {
log.WithError(err).Error("get worker status failed")
continue
}
log.WithFields(log.Fields{
"managerList": managerList,
"statuskey": workerStatusKey(addr, nonce),
}).Debug("get worker status")
if len(managerList) == 0 {
continue
}
worker := types.Worker{
Workerid: elem,
Addr: addr,
Nonce: nonce,
Priority: i,
Managers: managerList,
}
if false {
if !checkWorkerHasResource(rdb, worker.Addr, task.TaskType) {
p.addWorkerBack(worker)
if checked := checkedWorker[worker.Workerid]; checked {
break
} else {
checkedWorker[worker.Workerid] = true
continue
}
}
}
return worker, nil
}
}
return types.Worker{}, ErrNoWorker
}
func parseWorkerId(elem string) (string, int64) {
split := "_"
strs := strings.Split(elem, split)
if len(strs) == 2 {
addr := strs[0]
nonceds := strings.Split(strs[1], ":")
nonce, _ := strconv.ParseInt(nonceds[0], 10, 64)
return addr, nonce
}
return "", 0
}
func workerStatusKey(addr string, nonce int64) string {
id := workerId(addr, nonce)
return fmt.Sprintf("%s_%s", config.WORKER_STATUS_PREFIX, id)
}
func workerId(addr string, nonce int64) string {
return fmt.Sprintf("%s_%d", addr, nonce)
}
func checkWorkerHasResource(rdb *redis.Client, addr string, resource uint64) bool {
k := workerResourceInfoKey(addr)
rstr, err := rdb.Get(context.Background(), k).Result()
if err != nil {
return false
}
data, _ := hex.DecodeString(rstr)
b := bitmap.New(100000)
if err := b.UnmarshalBinary(data); err != nil {
return false
}
return b.IsSet(resource)
}
func workerResourceInfoKey(addr string) string {
return config.WORKER_RESOURCE_INFO_PREFIX + addr
}
package workerpoper
import (
"context"
"github.com/odysseus/mogo/operator"
odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
"github.com/odysseus/scheduler/config"
"github.com/odysseus/scheduler/types"
"github.com/odysseus/scheduler/utils"
"github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"
"go.mongodb.org/mongo-driver/mongo"
"math/rand"
)
type poperV2 struct {
mogo *mongo.Client
workerInfoOperator *operator.WorkerInfoOperator
workerInstalledOperator *operator.WorkerInstalledOperator
workerRunningOperator *operator.WorkerRunningOperator
}
func newPoperV2() (*poperV2, error) {
mogo, err := utils.ConnectMongoDB(config.GetConfig().Mongo.Url, config.GetConfig().Mongo.User, config.GetConfig().Mongo.Passwd)
if err != nil {
log.WithError(err).Error("failed to connect mongodb")
return nil, err
}
p := &poperV2{
mogo: mogo,
workerInfoOperator: operator.NewDBWorker(mogo, config.GetConfig().Mongo.Database),
workerInstalledOperator: operator.NewDBWorkerInstalled(mogo, config.GetConfig().Mongo.Database),
workerRunningOperator: operator.NewDBWorkerRunning(mogo, config.GetConfig().Mongo.Database),
}
return p, nil
}
func (p *poperV2) CanAddBack() bool {
return false
}
func (p *poperV2) AddBack(w types.Worker) {
}
func (p *poperV2) PopWorker(ctx context.Context, rdb *redis.Client, task *odysseus.TaskContent, ex map[string]bool) (types.Worker, error) {
// 1. select from running model worker.
workers, err := p.workerRunningOperator.FindWorkerByModelId(ctx, int(task.TaskType), 10)
if err != nil {
log.WithField("tasktype", task.TaskType).WithError(err).Error("get running model worker failed")
return types.Worker{}, err
}
validWorkers := make([]types.Worker, 0)
for _, worker := range workers {
if ex[worker.WorkerId] {
continue
}
addr := worker.WorkerId
nonce := int64(0)
managerList, err := rdb.SMembers(context.Background(), workerStatusKey(addr, nonce)).Result()
if err != nil {
log.WithError(err).Error("get worker status failed")
continue
}
log.WithFields(log.Fields{
"managerList": managerList,
"statuskey": workerStatusKey(addr, nonce),
}).Debug("get worker status that found with running model")
if len(managerList) == 0 {
continue
}
w := types.Worker{
Workerid: worker.WorkerId,
Addr: addr,
Nonce: nonce,
Priority: 0,
Managers: managerList,
}
validWorkers = append(validWorkers, w)
}
if len(validWorkers) == 0 {
// 2. select worker who has installed model.
workers, err := p.workerInstalledOperator.FindWorkerByModelId(ctx, int(task.TaskType), 10)
if err != nil {
log.WithField("tasktype", task.TaskType).WithError(err).Error("get installed model worker failed")
return types.Worker{}, err
}
for _, worker := range workers {
if ex[worker.WorkerId] {
continue
}
addr := worker.WorkerId
nonce := int64(0)
managerList, err := rdb.SMembers(context.Background(), workerStatusKey(addr, nonce)).Result()
if err != nil {
log.WithError(err).Error("get worker status failed")
continue
}
log.WithFields(log.Fields{
"managerList": managerList,
"statuskey": workerStatusKey(addr, nonce),
}).Debug("get worker status that found with installed model")
if len(managerList) == 0 {
continue
}
w := types.Worker{
Workerid: worker.WorkerId,
Addr: addr,
Nonce: nonce,
Priority: 0,
Managers: managerList,
}
validWorkers = append(validWorkers, w)
}
}
if len(validWorkers) > 0 {
// todo: random select an worker
rdm := rand.Intn(len(validWorkers))
worker := validWorkers[rdm]
return worker, nil
}
return types.Worker{}, ErrNoWorker
}
package workerpoper
import (
"context"
"errors"
odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
"github.com/odysseus/scheduler/types"
"github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"
)
var (
ErrNoWorker = errors.New("no worker")
ErrTimeout = errors.New("schedule timeout")
)
type WorkerPoper interface {
CanAddBack() bool
AddBack(worker types.Worker)
PopWorker(ctx context.Context, rdb *redis.Client, task *odysseus.TaskContent, ex map[string]bool) (types.Worker, error)
}
func NewPopWorker() WorkerPoper {
v2, err := newPoperV2()
if err != nil {
log.WithField("err", err).Error("failed to create poperV2")
return nil
}
return v2
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment