Commit f043b95f authored by Your Name's avatar Your Name

kafka consumer instance

parent e1769f5f
......@@ -101,6 +101,7 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUps
resOutStream := make(chan pbUpstream.TaskReceipt, 1000)
go func() {
// Create a new Sarama configuration for the Kafka producer.
config := sarama.NewConfig()
......@@ -123,19 +124,6 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUps
// Ensure the partition consumer is closed when the function ends (deferred execution).
defer reqConsumer.Close()
resConsumer, err := consumer.ConsumePartition(resTopic, 0, sarama.OffsetNewest)
if err != nil {
log.Fatal("Failed to start partition consumer:", err)
}
// Ensure the partition consumer is closed when the function ends (deferred execution).
defer resConsumer.Close()
// Signal that the consumer goroutine is ready
//wg.Done()
// Infinite loop to continuously listen for messages from the partitionConsumer.Messages() channel.
for {
select {
case message := <-reqConsumer.Messages():
......@@ -164,6 +152,49 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUps
return
case reqOutStream <- pbMsg:
}
}
}
}()
go func() {
// Create a new Sarama configuration for the Kafka producer.
config := sarama.NewConfig()
// Create a new Kafka consumer using the specified configuration and broker addresses.
consumer, err := sarama.NewConsumer(kafkaBrokers, config)
if err != nil {
log.Fatal("Failed to start Kafka consumer:", err)
}
// Ensure the Kafka consumer is closed when the function ends (deferred execution).
defer consumer.Close()
// Create a partition consumer for the specified topic, partition, and starting offset.
// // The starting offset is set to sarama.OffsetNewest, which means the consumer will start consuming messages from the latest available offset.
// reqConsumer, err := consumer.ConsumePartition(req, 0, sarama.OffsetNewest)
// if err != nil {
// log.Fatal("Failed to start partition consumer:", err)
// }
// // Ensure the partition consumer is closed when the function ends (deferred execution).
// defer reqConsumer.Close()
resConsumer, err := consumer.ConsumePartition(resTopic, 0, sarama.OffsetNewest)
if err != nil {
log.Fatal("Failed to start partition consumer:", err)
}
// Ensure the partition consumer is closed when the function ends (deferred execution).
defer resConsumer.Close()
// Signal that the consumer goroutine is ready
//wg.Done()
// Infinite loop to continuously listen for messages from the partitionConsumer.Messages() channel.
for {
select {
case message := <-resConsumer.Messages():
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment