package server

import (
	"context"
	"encoding/json"
	"errors"
	"github.com/IBM/sarama"
	"github.com/gogo/protobuf/proto"
	"github.com/odysseus/cache/cachedata"
	"github.com/odysseus/cache/model"
	odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
	"github.com/odysseus/scheduler/config"
	"github.com/odysseus/scheduler/types"
	"github.com/odysseus/scheduler/utils"
	"github.com/odysseus/scheduler/workerpoper"
	"github.com/odysseus/service-registry/common"
	"github.com/odysseus/service-registry/query"
	"github.com/odysseus/service-registry/registry"
	"github.com/redis/go-redis/v9"
	log "github.com/sirupsen/logrus"
	"net/http"
	_ "net/http/pprof"
	"strconv"
	"strings"
	"sync"
	"time"
)

type Node struct {
	rdb           *redis.Client
	quit          chan struct{}
	conf          *config.Config
	kafkaProducer sarama.AsyncProducer
	cache         *cachedata.CacheData
	wg            sync.WaitGroup
	status        string
	reg           *registry.Registry
	receiptCh     chan *odysseus.TaskReceipt

	poper workerpoper.WorkerPoper
}

func (n *Node) ServiceType() common.ServiceType {
	return common.SERVICE_SCHEDULER
}

func (n *Node) Status() string {
	return n.status
}

func (n *Node) DetailInfo() (json.RawMessage, error) {
	info := query.SchedulerInfo{}
	return json.Marshal(info)
}

func NewNode() *Node {
	redisConfig := config.GetConfig().Redis
	rdb := utils.NewRedisClient(utils.RedisConnParam{
		Addr:     redisConfig.Addr,
		Password: redisConfig.Password,
		DbIndex:  redisConfig.DbIndex,
	})
	dbconf := config.GetConfig().DbConfig
	pay := cachedata.NewCacheData(context.Background(), cachedata.RedisConnParam{
		Addr:     redisConfig.Addr,
		Password: redisConfig.Password,
		DbIndex:  redisConfig.DbIndex,
	}, model.DbConfig{
		Host:   dbconf.Host,
		Port:   dbconf.Port,
		User:   dbconf.User,
		Passwd: dbconf.Passwd,
		DbName: dbconf.DbName,
	})
	poper := workerpoper.NewPopWorker()
	if poper == nil {
		panic("failed to create poper")
	}

	node := &Node{
		cache:     pay,
		rdb:       rdb,
		quit:      make(chan struct{}),
		conf:      config.GetConfig(),
		receiptCh: make(chan *odysseus.TaskReceipt, 100000),
		poper:     poper,
	}
	node.status = "before running"
	node.register()

	return node
}

func (n *Node) Start() error {
	n.status = "running"
	go n.startProducer()
	if n.reg != nil {
		go n.reg.Start()
	}
	// start pprof
	go func() {
		err := http.ListenAndServe("0.0.0.0:6060", nil)
		if err != nil {
			log.WithError(err).Error("pprof failed")
		}
	}()
	return n.startAllTask()
}

func (n *Node) startProducer() {
	brokers := strings.Split(n.conf.Kafka.Brokers, ";")
	running := true
	for {
		producer, err := utils.NewKafkaProducer(brokers)
		if err != nil {
			log.WithError(err).Error("create kafka producer failed")
			time.Sleep(time.Second)
			continue
		}
		n.kafkaProducer = producer
		for running {
			select {
			case <-n.quit:
				return
			case receipt, ok := <-n.receiptCh:
				if !ok {
					return
				}
				if err := utils.FireTaskReceipt(n.kafkaProducer, receipt, config.GetConfig().Kafka.ReceiptTopic); err != nil {
					log.WithError(err).WithField("task-id", receipt.TaskId).Error("fire task receipt failed")
				} else {
					if err == sarama.ErrClosedClient {
						running = false
					}
				}
			}
		}
	}
}

func (n *Node) Stop() {
	close(n.quit)
	n.wg.Wait()
	if n.reg != nil {
		n.reg.Stop()
	}
	n.status = "stopped"
}

func (n *Node) startAllTask() error {
	for i := 0; i < config.GetConfig().Routines; i++ {
		n.wg.Add(1)
		go n.Loop(i)
	}
	return nil
}

func (n *Node) register() {
	reg := registry.NewRegistry(registry.RedisConnParam{
		Addr:     n.conf.Redis.Addr,
		Password: n.conf.Redis.Password,
		DbIndex:  n.conf.Redis.DbIndex,
	}, n)
	n.reg = reg
}

func (n *Node) postResult(task *odysseus.TaskContent, result *odysseus.TaskResponse) error {
	d, _ := proto.Marshal(result)
	err := utils.Post(task.TaskCallback, d)
	if err != nil {
		log.WithError(err).Error("post task result failed")
	} else {
		log.WithField("taskid", task.TaskId).Debug("post task result")
	}
	uid, _ := strconv.ParseInt(task.TaskUid, 10, 64)
	fee, _ := strconv.ParseInt(task.TaskFee, 10, 64)
	n.cache.RollbackForFee(uid, fee)

	return err
}

func (n *Node) postReceipt(task *odysseus.TaskContent, result *odysseus.TaskResponse, err error) error {
	receipt := new(odysseus.TaskReceipt)
	receipt.TaskId = task.TaskId
	receipt.TaskTimestamp = task.TaskTimestamp
	receipt.TaskType = task.TaskType
	receipt.TaskUid = task.TaskUid
	receipt.TaskWorkload = task.TaskWorkload
	receipt.TaskDuration = (time.Now().UnixNano() - int64(task.TaskTimestamp)) / 1000
	receipt.TaskFee = 0
	receipt.TaskOutLen = 0
	receipt.TaskExecuteDuration = 0
	receipt.TaskProfitAccount = ""
	receipt.TaskWorkerAccount = ""
	switch err {
	case types.ErrNoWorker, types.ErrTimeout:
		receipt.TaskResult = err.Error()
	case ErrDispatchFailed:
		receipt.TaskResult = err.Error()
	default:
		receipt.TaskResult = "internal error"
	}
	n.receiptCh <- receipt
	return nil
}

func (n *Node) Loop(idx int) {
	defer n.wg.Done()

	defer log.WithField("routine", idx).Info("node loop routine exit")

	var (
		err     error
		running bool = true
		client  sarama.ConsumerGroup
	)
	// monitor kafka
	taskCh := make(chan *odysseus.TaskContent, 1000)
	ctx, cancel := context.WithCancel(context.Background())

	for {
		client, err = attachKafkaConsumer(ctx, taskCh, n)
		if err != nil {
			log.WithError(err).Error("attach kafka consumer failed try again later")
			time.Sleep(time.Second)
			n.status = "attach kafka failed"
			continue
		}
		log.WithField("routine", idx).Info("attach kafka consumer success")
		n.status = "running"
		for running {
			select {
			case clientErr := <-client.Errors():
				log.WithError(clientErr).Error("kafka consumer error")
				if clientErr == sarama.ErrClosedClient {
					running = false
				}
			case <-n.quit:
				cancel()
				client.Close()
				return

			case t := <-taskCh:
				fctx, _ := context.WithTimeout(context.Background(), time.Second*time.Duration(config.GetConfig().DispatchTimeout))
				go func(ctx context.Context, task *odysseus.TaskContent) {
					l := log.WithField("task-id", task.TaskId)
					l.WithField("task", task).Info("get task")
					var (
						failed = make(map[string]bool)
						t1     = time.Now()
						stop   = false
						terr   error
					)
					for !stop {
						var worker types.Worker
						worker, terr = n.poper.PopWorker(ctx, n.rdb, t, failed)
						if terr == types.ErrNoWorker || terr == types.ErrTimeout {
							result := &odysseus.TaskResponse{
								TaskId:        task.TaskId,
								TaskUid:       task.TaskUid,
								TaskFee:       task.TaskFee,
								TaskIsSucceed: false,
								TaskError:     terr.Error(),
							}

							if terr = n.postReceipt(task, result, terr); terr != nil {
								l.WithError(terr).Error("post task receipt failed")
							}
							if terr = n.postResult(task, result); terr != nil {
								l.WithError(terr).Error("post task result failed")
							}
							stop = true
						}

						if terr != nil {
							l.WithError(terr).Error("pop worker failed")
							continue
						}
						terr = n.DispatchTask(ctx, worker, task)
						if terr != nil {
							l.WithError(terr).Error("dispatch task failed")
							failed[worker.Workerid] = true
							continue
						} else {
							stop = true
						}
					}
					l.WithFields(log.Fields{
						"timecost": time.Now().Sub(t1).Milliseconds(),
						"err":      terr,
					}).Debug("scheduler task finished")
				}(fctx, t)
			}
		}
	}
}

func attachKafkaConsumer(ctx context.Context, taskCh chan *odysseus.TaskContent, n *Node) (sarama.ConsumerGroup, error) {
	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true
	config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}
	//config.Consumer.Offsets.Initial = sarama.OffsetOldest
	config.Consumer.Offsets.Initial = sarama.OffsetNewest

	// split broker to list
	brokers := strings.Split(n.conf.Kafka.Brokers, ";")

	client, err := sarama.NewConsumerGroup(brokers, "test", config)
	if err != nil {
		log.WithError(err).Error("creating consumer group client failed")
		return nil, err
	}

	consumeFunc := func(consumer *Consumer) {
		topics := strings.Split(n.conf.Kafka.TaskTopic, ";")
		for {
			if err := client.Consume(ctx, topics, consumer); err != nil {
				if errors.Is(err, sarama.ErrClosedConsumerGroup) {
					n.status = "kafka consumer closed"
					return
				}
				log.WithError(err).Error("error from consumer")
			}
			// check if context was cancelled, signaling that the consumer should stop
			if ctx.Err() != nil {
				log.WithError(ctx.Err()).Error("consumer kafka context was cancelled")
				return
			}
			consumer.ready = make(chan bool)
		}
	}
	go consumeFunc(&Consumer{
		ready:  make(chan bool),
		taskCh: taskCh,
	})

	return client, nil
}

// Consumer represents a Sarama consumer group consumer
type Consumer struct {
	ready  chan bool
	taskCh chan *odysseus.TaskContent
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (c *Consumer) Setup(sarama.ConsumerGroupSession) error {
	// Mark the consumer as ready
	return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
	return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
// Once the Messages() channel is closed, the Handler must finish its processing
// loop and exit.
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for {
		select {
		case message, ok := <-claim.Messages():
			if !ok {
				log.Printf("message channel was closed")
				return nil
			}
			var task = new(odysseus.TaskContent)
			if err := proto.Unmarshal(message.Value, task); err != nil {
				log.WithError(err).Error("unmarshal task failed")
				continue
			}

			c.taskCh <- task

			session.MarkMessage(message, "")
		case <-session.Context().Done():
			return nil
		}
	}
}