package server import ( "context" "encoding/json" "errors" "github.com/IBM/sarama" "github.com/gogo/protobuf/proto" "github.com/odysseus/cache/cachedata" "github.com/odysseus/cache/model" odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1" "github.com/odysseus/scheduler/config" "github.com/odysseus/scheduler/types" "github.com/odysseus/scheduler/utils" "github.com/odysseus/scheduler/workerpoper" "github.com/odysseus/service-registry/common" "github.com/odysseus/service-registry/query" "github.com/odysseus/service-registry/registry" "github.com/redis/go-redis/v9" log "github.com/sirupsen/logrus" "net/http" _ "net/http/pprof" "strconv" "strings" "sync" "time" ) type Node struct { rdb *redis.Client quit chan struct{} conf *config.Config kafkaProducer sarama.AsyncProducer cache *cachedata.CacheData wg sync.WaitGroup status string reg *registry.Registry receiptCh chan *odysseus.TaskReceipt poper workerpoper.WorkerPoper } func (n *Node) ServiceType() common.ServiceType { return common.SERVICE_SCHEDULER } func (n *Node) Status() string { return n.status } func (n *Node) DetailInfo() (json.RawMessage, error) { info := query.SchedulerInfo{} return json.Marshal(info) } func NewNode() *Node { redisConfig := config.GetConfig().Redis rdb := utils.NewRedisClient(utils.RedisConnParam{ Addr: redisConfig.Addr, Password: redisConfig.Password, DbIndex: redisConfig.DbIndex, }) dbconf := config.GetConfig().DbConfig pay := cachedata.NewCacheData(context.Background(), cachedata.RedisConnParam{ Addr: redisConfig.Addr, Password: redisConfig.Password, DbIndex: redisConfig.DbIndex, }, model.DbConfig{ Host: dbconf.Host, Port: dbconf.Port, User: dbconf.User, Passwd: dbconf.Passwd, DbName: dbconf.DbName, }) poper := workerpoper.NewPopWorker() if poper == nil { panic("failed to create poper") } node := &Node{ cache: pay, rdb: rdb, quit: make(chan struct{}), conf: config.GetConfig(), receiptCh: make(chan *odysseus.TaskReceipt, 100000), poper: poper, } node.status = "before running" node.register() return node } func (n *Node) Start() error { n.status = "running" go n.startProducer() if n.reg != nil { go n.reg.Start() } // start pprof go func() { err := http.ListenAndServe("0.0.0.0:6060", nil) if err != nil { log.WithError(err).Error("pprof failed") } }() return n.startAllTask() } func (n *Node) startProducer() { brokers := strings.Split(n.conf.Kafka.Brokers, ";") running := true for { producer, err := utils.NewKafkaProducer(brokers) if err != nil { log.WithError(err).Error("create kafka producer failed") time.Sleep(time.Second) continue } n.kafkaProducer = producer for running { select { case <-n.quit: return case receipt, ok := <-n.receiptCh: if !ok { return } if err := utils.FireTaskReceipt(n.kafkaProducer, receipt, config.GetConfig().Kafka.ReceiptTopic); err != nil { log.WithError(err).WithField("task-id", receipt.TaskId).Error("fire task receipt failed") } else { if err == sarama.ErrClosedClient { running = false } } } } } } func (n *Node) Stop() { close(n.quit) n.wg.Wait() if n.reg != nil { n.reg.Stop() } n.status = "stopped" } func (n *Node) startAllTask() error { for i := 0; i < config.GetConfig().Routines; i++ { n.wg.Add(1) go n.Loop(i) } return nil } func (n *Node) register() { reg := registry.NewRegistry(registry.RedisConnParam{ Addr: n.conf.Redis.Addr, Password: n.conf.Redis.Password, DbIndex: n.conf.Redis.DbIndex, }, n) n.reg = reg } func (n *Node) postResult(task *odysseus.TaskContent, result *odysseus.TaskResponse) error { d, _ := proto.Marshal(result) err := utils.Post(task.TaskCallback, d) if err != nil { log.WithError(err).Error("post task result failed") } else { log.WithField("taskid", task.TaskId).Debug("post task result") } uid, _ := strconv.ParseInt(task.TaskUid, 10, 64) fee, _ := strconv.ParseInt(task.TaskFee, 10, 64) n.cache.RollbackForFee(uid, fee) return err } func (n *Node) postReceipt(task *odysseus.TaskContent, result *odysseus.TaskResponse, err error) error { receipt := new(odysseus.TaskReceipt) receipt.TaskId = task.TaskId receipt.TaskTimestamp = task.TaskTimestamp receipt.TaskType = task.TaskType receipt.TaskUid = task.TaskUid receipt.TaskWorkload = task.TaskWorkload receipt.TaskDuration = (time.Now().UnixNano() - int64(task.TaskTimestamp)) / 1000 receipt.TaskFee = 0 receipt.TaskOutLen = 0 receipt.TaskExecuteDuration = 0 receipt.TaskProfitAccount = "" receipt.TaskWorkerAccount = "" switch err { case types.ErrNoWorker, types.ErrTimeout: receipt.TaskResult = err.Error() case ErrDispatchFailed: receipt.TaskResult = err.Error() default: receipt.TaskResult = "internal error" } n.receiptCh <- receipt return nil } func (n *Node) Loop(idx int) { defer n.wg.Done() defer log.WithField("routine", idx).Info("node loop routine exit") var ( err error running bool = true client sarama.ConsumerGroup ) // monitor kafka taskCh := make(chan *odysseus.TaskContent, 1000) ctx, cancel := context.WithCancel(context.Background()) for { client, err = attachKafkaConsumer(ctx, taskCh, n) if err != nil { log.WithError(err).Error("attach kafka consumer failed try again later") time.Sleep(time.Second) n.status = "attach kafka failed" continue } log.WithField("routine", idx).Info("attach kafka consumer success") n.status = "running" for running { select { case clientErr := <-client.Errors(): log.WithError(clientErr).Error("kafka consumer error") if clientErr == sarama.ErrClosedClient { running = false } case <-n.quit: cancel() client.Close() return case t := <-taskCh: fctx, _ := context.WithTimeout(context.Background(), time.Second*time.Duration(config.GetConfig().DispatchTimeout)) go func(ctx context.Context, task *odysseus.TaskContent) { l := log.WithField("task-id", task.TaskId) l.WithField("task", task).Info("get task") var ( failed = make(map[string]bool) t1 = time.Now() stop = false terr error ) for !stop { var worker types.Worker worker, terr = n.poper.PopWorker(ctx, n.rdb, t, failed) if terr == types.ErrNoWorker || terr == types.ErrTimeout { result := &odysseus.TaskResponse{ TaskId: task.TaskId, TaskUid: task.TaskUid, TaskFee: task.TaskFee, TaskIsSucceed: false, TaskError: terr.Error(), } if terr = n.postReceipt(task, result, terr); terr != nil { l.WithError(terr).Error("post task receipt failed") } if terr = n.postResult(task, result); terr != nil { l.WithError(terr).Error("post task result failed") } stop = true } if terr != nil { l.WithError(terr).Error("pop worker failed") continue } terr = n.DispatchTask(ctx, worker, task) if terr != nil { l.WithError(terr).Error("dispatch task failed") failed[worker.Workerid] = true continue } else { stop = true } } l.WithFields(log.Fields{ "timecost": time.Now().Sub(t1).Milliseconds(), "err": terr, }).Debug("scheduler task finished") }(fctx, t) } } } } func attachKafkaConsumer(ctx context.Context, taskCh chan *odysseus.TaskContent, n *Node) (sarama.ConsumerGroup, error) { config := sarama.NewConfig() config.Consumer.Return.Errors = true config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()} //config.Consumer.Offsets.Initial = sarama.OffsetOldest config.Consumer.Offsets.Initial = sarama.OffsetNewest // split broker to list brokers := strings.Split(n.conf.Kafka.Brokers, ";") client, err := sarama.NewConsumerGroup(brokers, "test", config) if err != nil { log.WithError(err).Error("creating consumer group client failed") return nil, err } consumeFunc := func(consumer *Consumer) { topics := strings.Split(n.conf.Kafka.TaskTopic, ";") for { if err := client.Consume(ctx, topics, consumer); err != nil { if errors.Is(err, sarama.ErrClosedConsumerGroup) { n.status = "kafka consumer closed" return } log.WithError(err).Error("error from consumer") } // check if context was cancelled, signaling that the consumer should stop if ctx.Err() != nil { log.WithError(ctx.Err()).Error("consumer kafka context was cancelled") return } consumer.ready = make(chan bool) } } go consumeFunc(&Consumer{ ready: make(chan bool), taskCh: taskCh, }) return client, nil } // Consumer represents a Sarama consumer group consumer type Consumer struct { ready chan bool taskCh chan *odysseus.TaskContent } // Setup is run at the beginning of a new session, before ConsumeClaim func (c *Consumer) Setup(sarama.ConsumerGroupSession) error { // Mark the consumer as ready return nil } // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error { return nil } // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). // Once the Messages() channel is closed, the Handler must finish its processing // loop and exit. func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for { select { case message, ok := <-claim.Messages(): if !ok { log.Printf("message channel was closed") return nil } var task = new(odysseus.TaskContent) if err := proto.Unmarshal(message.Value, task); err != nil { log.WithError(err).Error("unmarshal task failed") continue } c.taskCh <- task session.MarkMessage(message, "") case <-session.Context().Done(): return nil } } }