Commit bee29b58 authored by vicotor's avatar vicotor

fix bug for kafka

parent 89b1b0ce
...@@ -4,17 +4,16 @@ import ( ...@@ -4,17 +4,16 @@ import (
"github.com/IBM/sarama" "github.com/IBM/sarama"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1" odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
"time" log "github.com/sirupsen/logrus"
) )
// NewKafkaProducer Create a new KafkaProducer. // NewKafkaProducer Create a new KafkaProducer.
func NewKafkaProducer(brokers []string) (sarama.AsyncProducer, error) { func NewKafkaProducer(brokers []string) (sarama.AsyncProducer, error) {
kafkaConfig := sarama.NewConfig() kafkaConfig := sarama.NewConfig()
kafkaConfig.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack //kafkaConfig.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
kafkaConfig.Producer.Compression = sarama.CompressionSnappy // Compress messages //kafkaConfig.Producer.Compression = sarama.CompressionSnappy // Compress messages
kafkaConfig.Producer.Return.Errors = false //kafkaConfig.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
kafkaConfig.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms //kafkaConfig.Net.ResolveCanonicalBootstrapServers = false
kafkaConfig.Net.ResolveCanonicalBootstrapServers = false
producer, err := sarama.NewAsyncProducer(brokers, kafkaConfig) producer, err := sarama.NewAsyncProducer(brokers, kafkaConfig)
...@@ -23,8 +22,8 @@ func NewKafkaProducer(brokers []string) (sarama.AsyncProducer, error) { ...@@ -23,8 +22,8 @@ func NewKafkaProducer(brokers []string) (sarama.AsyncProducer, error) {
} }
go func() { go func() {
for _ = range producer.Errors() { for ierr := range producer.Errors() {
//log.Printf("Failed to send log entry to kafka: %v\n", err) log.WithError(ierr).Debug("Failed to send log entry to kafka")
} }
}() }()
...@@ -33,8 +32,8 @@ func NewKafkaProducer(brokers []string) (sarama.AsyncProducer, error) { ...@@ -33,8 +32,8 @@ func NewKafkaProducer(brokers []string) (sarama.AsyncProducer, error) {
func FireTaskReceipt(producer sarama.AsyncProducer, receipt *odysseus.TaskReceipt, topic string) error { func FireTaskReceipt(producer sarama.AsyncProducer, receipt *odysseus.TaskReceipt, topic string) error {
// Check time for partition key // Check time for partition key
var partitionKey sarama.StringEncoder //var partitionKey sarama.StringEncoder
partitionKey = sarama.StringEncoder(time.Now().Format("2006-01-02")) //partitionKey = sarama.StringEncoder(time.Now().Format("2006-01-02"))
b, err := proto.Marshal(receipt) b, err := proto.Marshal(receipt)
if err != nil { if err != nil {
...@@ -43,7 +42,7 @@ func FireTaskReceipt(producer sarama.AsyncProducer, receipt *odysseus.TaskReceip ...@@ -43,7 +42,7 @@ func FireTaskReceipt(producer sarama.AsyncProducer, receipt *odysseus.TaskReceip
value := sarama.ByteEncoder(b) value := sarama.ByteEncoder(b)
producer.Input() <- &sarama.ProducerMessage{ producer.Input() <- &sarama.ProducerMessage{
Key: partitionKey, //Key: partitionKey,
Topic: topic, Topic: topic,
Value: value, Value: value,
} }
......
package utils
import (
"fmt"
"github.com/google/uuid"
basev1 "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
"strings"
"testing"
"time"
)
func Test(t *testing.T) {
broker := "192.168.1.220:9092"
brokers := strings.Split(broker, ";")
producer, err := NewKafkaProducer(brokers)
if err != nil {
t.Fatalf("NewKafkaProducer failed with err:%s", err.Error())
}
for i := 0; i < 100; i++ {
taskReceipt := &basev1.TaskReceipt{
TaskUid: uuid.NewString(),
}
err = FireTaskReceipt(producer, taskReceipt, "taskreceipt")
if err != nil {
t.Fatalf("FireTaskReceipt failed with err:%s", err.Error())
}
fmt.Println("send task receipt success")
time.Sleep(time.Second)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment