Commit b1444840 authored by luxq's avatar luxq

implement schedule

parent 815db8ec
......@@ -60,14 +60,19 @@ func initConfig() {
return
}
_, err := config.ParseConfig(viper.ConfigFileUsed())
conf, err := config.ParseConfig(viper.ConfigFileUsed())
if err != nil {
log.WithField("error", err).Fatal("parse config failed")
}
log.Infof("start with config: %+v", conf)
}
func runNode() {
server.StartService()
node := server.NewNode()
err := node.Start()
if err != nil {
log.WithError(err).Fatal("node start failed")
}
wg := sync.WaitGroup{}
wg.Add(1)
......
endpoint=":10001"
metrics_port = 28010
endpoint="127.0.0.1:10002"
metrics_port = 28012
routines = 10
[redis]
addr="127.0.0.1:6379"
password="123456"
db=0
[kafka]
brokers="192.168.1.220:9092"
topic="pbaigc"
\ No newline at end of file
......@@ -6,17 +6,26 @@ import (
"io/ioutil"
)
type RedisConfig struct {
Addr string `json:"addr" toml:"addr"`
Password string `json:"password" toml:"password"`
DbIndex int `json:"db_index" toml:"db_index"`
}
type KafkaConfig struct {
Brokers string `json:"addr" toml:"addr"`
Topic string `json:"topic" toml:"topic"`
}
type Config struct {
Endpoint string `json:"endpoint" toml:"endpoint"`
MetricsPort int `json:"metrics_port" toml:"metrics_port"`
Endpoint string `json:"endpoint" toml:"endpoint"`
MetricPort int `json:"metrics_port" toml:"metrics_port"`
Routines int `json:"routines" toml:"routines"`
Redis RedisConfig `json:"redis" toml:"redis"`
Kafka KafkaConfig `json:"kafka" toml:"kafka"`
}
var _cfg *Config = nil
func (conf *Config) MetricPort() int {
return conf.MetricsPort
}
func ParseConfig(path string) (*Config, error) {
data, err := ioutil.ReadFile(path)
if err != nil {
......@@ -24,7 +33,6 @@ func ParseConfig(path string) (*Config, error) {
panic(err)
}
err = toml.Unmarshal(data, &_cfg)
// err = json.Unmarshal(data, &_cfg)
if err != nil {
log.Error("unmarshal config failed", "err", err)
panic(err)
......@@ -35,16 +43,3 @@ func ParseConfig(path string) (*Config, error) {
func GetConfig() *Config {
return _cfg
}
var (
DefaultCors = []string{"localhost"} // Default cors domain for the apis
DefaultVhosts = []string{"localhost"} // Default virtual hosts for the apis
DefaultOrigins = []string{"localhost"} // Default origins for the apis
DefaultPrefix = "" // Default prefix for the apis
DefaultModules = []string{"time"}
)
const (
APIBatchItemLimit = 2000
APIBatchResponseSizeLimit = 250 * 1000 * 1000
)
package config
const (
NODE_MANAGER_SET = "node_manager_set"
WORKER_STATUS_PREFIX = "worker_status_"
WORKER_QUEUE_PREFIX = "worker_queue_"
)
......@@ -4,30 +4,57 @@ go 1.20
require (
github.com/BurntSushi/toml v1.3.2
github.com/IBM/sarama v1.42.1
github.com/ethereum/go-ethereum v1.13.10
github.com/gogo/protobuf v1.3.2
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible
github.com/odysseus/odysseus-protocol v0.0.0-00010101000000-000000000000
github.com/prometheus/client_golang v1.18.0
github.com/redis/go-redis/v9 v9.4.0
github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.8.0
github.com/spf13/viper v1.18.2
google.golang.org/grpc v1.60.1
)
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/eapache/go-resiliency v1.4.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/holiman/uint256 v1.2.4 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/jonboulle/clockwork v0.4.0 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/lestrrat-go/strftime v1.0.6 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
......@@ -37,10 +64,15 @@ require (
github.com/subosito/gotenv v1.6.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f // indirect
google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
replace github.com/odysseus/odysseus-protocol => ../odysseus-protocol
This diff is collapsed.
......@@ -91,7 +91,7 @@ func NewRegisteredMeter(subname string, name string) prometheus.Counter {
}
func StartMetrics() {
addr := fmt.Sprintf(":%d", config.GetConfig().MetricPort())
addr := fmt.Sprintf(":%d", config.GetConfig().MetricPort)
http.Handle("/metrics", promhttp.Handler())
err := http.ListenAndServe(addr, nil)
if err != nil {
......
package server
import (
"context"
"errors"
"github.com/IBM/sarama"
"github.com/gogo/protobuf/proto"
odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
"github.com/odysseus/scheduler/config"
"github.com/odysseus/scheduler/utils"
"github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"
"strings"
"sync"
)
type Node struct {
rdb *redis.Client
quit chan struct{}
conf *config.Config
wg sync.WaitGroup
}
func NewNode() *Node {
redisConfig := config.GetConfig().Redis
rdb := utils.NewRedisClient(utils.RedisConnParam{
Addr: redisConfig.Addr,
Password: redisConfig.Password,
DbIndex: redisConfig.DbIndex,
})
node := &Node{
rdb: rdb,
quit: make(chan struct{}),
conf: config.GetConfig(),
}
return node
}
func (n *Node) Start() error {
return n.startAllTask()
}
func (n *Node) Stop() {
close(n.quit)
n.wg.Wait()
}
func (n *Node) startAllTask() error {
for i := 0; i < config.GetConfig().Routines; i++ {
n.wg.Add(1)
go n.Loop(i)
}
return nil
}
func (n *Node) Loop(idx int) {
defer n.wg.Done()
defer log.WithField("routine", idx).Info("node loop routine exit")
// monitor kafka
taskCh := make(chan *odysseus.TaskContent, 1000)
ctx, cancel := context.WithCancel(context.Background())
client, err := n.attachKafkaConsumer(ctx, taskCh)
if err != nil {
log.WithError(err).Error("attach kafka consumer failed")
return
}
for {
select {
case <-n.quit:
cancel()
client.Close()
return
case task := <-taskCh:
log.WithField("task", task).Info("get task")
worker, err := PopWorker(n.rdb)
if err != nil {
log.WithError(err).Error("pop worker failed")
continue
}
err = DispatchTask(worker, task)
if err != nil {
log.WithError(err).Error("dispatch task failed")
continue
}
}
}
}
func (n *Node) attachKafkaConsumer(ctx context.Context, taskCh chan *odysseus.TaskContent) (sarama.ConsumerGroup, error) {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}
config.Consumer.Offsets.Initial = sarama.OffsetNewest
// split broker to list
brokers := strings.Split(n.conf.Kafka.Brokers, ";")
client, err := sarama.NewConsumerGroup(brokers, "group", config)
if err != nil {
log.WithError(err).Error("creating consumer group client failed")
return nil, err
}
consumeFunc := func(consumer *Consumer) {
topics := strings.Split(n.conf.Kafka.Topic, ";")
for {
if err := client.Consume(ctx, topics, consumer); err != nil {
if errors.Is(err, sarama.ErrClosedConsumerGroup) {
return
}
log.WithError(err).Error("error from consumer")
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
return
}
consumer.ready = make(chan bool)
}
}
go consumeFunc(&Consumer{
ready: make(chan bool),
taskCh: taskCh,
})
return client, nil
}
// Consumer represents a Sarama consumer group consumer
type Consumer struct {
ready chan bool
taskCh chan *odysseus.TaskContent
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (c *Consumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(c.ready)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
// Once the Messages() channel is closed, the Handler must finish its processing
// loop and exit.
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for {
select {
case message, ok := <-claim.Messages():
if !ok {
log.Printf("message channel was closed")
return nil
}
var task = new(odysseus.TaskContent)
if err := proto.Unmarshal(message.Value, task); err != nil {
log.WithError(err).Error("unmarshal task failed")
continue
}
c.taskCh <- task
session.MarkMessage(message, "")
case <-session.Context().Done():
return nil
}
}
}
package server
import (
"errors"
"fmt"
"github.com/odysseus/scheduler/server/apibackend"
"time"
)
var (
ErrParams = errors.New("invalid request")
ErrUnSupport = errors.New("unsupport feature")
)
type NodeManagerService struct {
backend apibackend.Backend
quit chan struct{}
}
func StartService() {
// print time per second
for {
select {
case <-time.Tick(time.Second):
fmt.Println(time.Now())
}
}
}
package server
import (
"context"
"errors"
odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
"github.com/odysseus/scheduler/config"
redis "github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"strconv"
)
var (
maxPriority = 1 // total priority for worker queue
)
type Worker struct {
workerid string
priority int
managers []string
}
func PopWorker(rdb *redis.Client) (Worker, error) {
for i := 0; i < maxPriority; i++ {
elem, err := rdb.LPop(context.Background(), config.WORKER_QUEUE_PREFIX+strconv.Itoa(i)).Result()
if err != nil {
continue
}
managerList, err := rdb.SMembers(context.Background(), config.WORKER_STATUS_PREFIX+elem).Result()
if err != nil {
continue
}
if len(managerList) == 0 {
continue
}
return Worker{
workerid: elem,
priority: i,
managers: managerList,
}, nil
}
return Worker{}, errors.New("no worker")
}
func newManagerClient(endpoint string) (omanager.NodeManagerServiceClient, error) {
client, err := grpc.Dial(endpoint,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(1024*1024*1024),
grpc.MaxCallSendMsgSize(1024*1024*1024)),
)
if err != nil {
return nil, err
}
manager := omanager.NewNodeManagerServiceClient(client)
return manager, nil
}
func DispatchTask(worker Worker, task *odysseus.TaskContent) error {
for _, manager := range worker.managers {
client, err := newManagerClient(manager)
if err != nil {
log.WithFields(log.Fields{
"manager": manager,
"error": err,
}).Error("connect to manager failed")
continue
}
_, err = client.DispatchTask(context.Background(), &omanager.DispatchTaskRequest{
Miner: worker.workerid,
TaskData: task,
})
if err != nil {
log.WithFields(log.Fields{
"manager": manager,
"error": err,
}).Error("dispatch to manager failed")
continue
}
return nil
}
return errors.New("dispatch to manager all failed")
}
package utils
func CombineBytes(b ...[]byte) []byte {
var result []byte
for _, v := range b {
result = append(result, v...)
}
return result
}
package utils
import (
"crypto/ecdsa"
"encoding/hex"
"github.com/ethereum/go-ethereum/crypto"
)
func HexToPrivatekey(key string) (*ecdsa.PrivateKey, error) {
return crypto.HexToECDSA(key)
}
func PrivatekeyToHex(key *ecdsa.PrivateKey) string {
return hex.EncodeToString(crypto.FromECDSA(key))
}
func PrivatekeyToAddress(key *ecdsa.PrivateKey) string {
return crypto.PubkeyToAddress(key.PublicKey).String()
}
func PubkeyToAddress(key *ecdsa.PublicKey) string {
return crypto.PubkeyToAddress(*key).String()
}
func PubkeyToHex(key *ecdsa.PublicKey) string {
pub := crypto.FromECDSAPub(key)
return hex.EncodeToString(pub)
}
func HexToPubkey(key string) (*ecdsa.PublicKey, error) {
pub, err := hex.DecodeString(key)
if err != nil {
return nil, err
}
return crypto.UnmarshalPubkey(pub)
}
package utils
import (
"github.com/redis/go-redis/v9"
)
type RedisConnParam struct {
Addr string
Password string
DbIndex int
}
func NewRedisClient(param RedisConnParam) *redis.Client {
return redis.NewClient(&redis.Options{
Addr: param.Addr,
Password: param.Password,
DB: param.DbIndex,
})
}
package utils
import (
"sync"
"time"
)
var (
machineID int64 // 机器 id 占10位, 十进制范围是 [ 0, 1023 ]
sn int64 // 序列号占 12 位,十进制范围是 [ 0, 4095 ]
lastTimeStamp int64 // 上次的时间戳(毫秒级), 1秒=1000毫秒, 1毫秒=1000微秒,1微秒=1000纳秒
mu sync.Mutex
)
func init() {
machineID = 101 << 12
lastTimeStamp = time.Now().UnixNano() / 1000
}
func GetSnowflakeIdProcess() int64 {
curTimeStamp := time.Now().UnixNano() / 1000
// 同一毫秒
if curTimeStamp == lastTimeStamp {
// 序列号占 12 位,十进制范围是 [ 0, 4095 ]
if sn > 4095 {
time.Sleep(time.Microsecond)
curTimeStamp = time.Now().UnixNano() / 1000
sn = 0
}
} else {
sn = 0
}
sn++
lastTimeStamp = curTimeStamp
// 取 64 位的二进制数 0000000000 0000000000 0000000000 0001111111111 1111111111 1111111111 1 ( 这里共 41 个 1 )和时间戳进行并操作
// 并结果( 右数 )第 42 位必然是 0, 低 41 位也就是时间戳的低 41 位
rightBinValue := curTimeStamp & 0x1FFFFFFFFFF
// 机器 id 占用10位空间,序列号占用12位空间,所以左移 22 位; 经过上面的并操作,左移后的第 1 位,必然是 0
rightBinValue <<= 22
id := rightBinValue | machineID | sn
return id
}
func GetSnowflakeId() int64 {
mu.Lock()
defer mu.Unlock()
return GetSnowflakeIdProcess()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment