package utils import ( "github.com/IBM/sarama" "github.com/gogo/protobuf/proto" odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1" log "github.com/sirupsen/logrus" ) // NewKafkaProducer Create a new KafkaProducer. func NewKafkaProducer(brokers []string) (sarama.AsyncProducer, error) { kafkaConfig := sarama.NewConfig() //kafkaConfig.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack //kafkaConfig.Producer.Compression = sarama.CompressionSnappy // Compress messages //kafkaConfig.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms //kafkaConfig.Net.ResolveCanonicalBootstrapServers = false producer, err := sarama.NewAsyncProducer(brokers, kafkaConfig) if err != nil { return nil, err } go func() { for ierr := range producer.Errors() { log.WithError(ierr).Debug("Failed to send log entry to kafka") } }() return producer, nil } func FireTaskReceipt(producer sarama.AsyncProducer, receipt *odysseus.TaskReceipt, topic string) error { // Check time for partition key //var partitionKey sarama.StringEncoder //partitionKey = sarama.StringEncoder(time.Now().Format("2006-01-02")) b, err := proto.Marshal(receipt) if err != nil { return err } value := sarama.ByteEncoder(b) producer.Input() <- &sarama.ProducerMessage{ //Key: partitionKey, Topic: topic, Value: value, } return nil }