package utils

import (
	"github.com/IBM/sarama"
	"github.com/gogo/protobuf/proto"
	odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
	log "github.com/sirupsen/logrus"
)

// NewKafkaProducer Create a new KafkaProducer.
func NewKafkaProducer(brokers []string) (sarama.AsyncProducer, error) {
	kafkaConfig := sarama.NewConfig()
	//kafkaConfig.Producer.RequiredAcks = sarama.WaitForLocal       // Only wait for the leader to ack
	//kafkaConfig.Producer.Compression = sarama.CompressionSnappy   // Compress messages
	//kafkaConfig.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
	//kafkaConfig.Net.ResolveCanonicalBootstrapServers = false

	producer, err := sarama.NewAsyncProducer(brokers, kafkaConfig)

	if err != nil {
		return nil, err
	}

	go func() {
		for ierr := range producer.Errors() {
			log.WithError(ierr).Debug("Failed to send log entry to kafka")
		}
	}()

	return producer, nil
}

func FireTaskReceipt(producer sarama.AsyncProducer, receipt *odysseus.TaskReceipt, topic string) error {
	// Check time for partition key
	//var partitionKey sarama.StringEncoder
	//partitionKey = sarama.StringEncoder(time.Now().Format("2006-01-02"))

	b, err := proto.Marshal(receipt)
	if err != nil {
		return err
	}
	value := sarama.ByteEncoder(b)

	producer.Input() <- &sarama.ProducerMessage{
		//Key:   partitionKey,
		Topic: topic,
		Value: value,
	}

	return nil
}