Commit 115c39f2 authored by vicotor's avatar vicotor

update code

parent 990eb14b
...@@ -8,6 +8,7 @@ require ( ...@@ -8,6 +8,7 @@ require (
github.com/astaxie/beego v1.12.3 github.com/astaxie/beego v1.12.3
github.com/ethereum/go-ethereum v1.13.10 github.com/ethereum/go-ethereum v1.13.10
github.com/gogo/protobuf v1.3.2 github.com/gogo/protobuf v1.3.2
github.com/google/uuid v1.4.0
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible
github.com/odysseus/odysseus-protocol v0.0.0-00010101000000-000000000000 github.com/odysseus/odysseus-protocol v0.0.0-00010101000000-000000000000
github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_golang v1.18.0
......
...@@ -94,6 +94,8 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ ...@@ -94,6 +94,8 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
......
package utils package utils
import ( import (
"fmt"
"github.com/IBM/sarama" "github.com/IBM/sarama"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1" odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
...@@ -10,9 +11,8 @@ import ( ...@@ -10,9 +11,8 @@ import (
// NewKafkaProducer Create a new KafkaProducer. // NewKafkaProducer Create a new KafkaProducer.
func NewKafkaProducer(brokers []string) (sarama.AsyncProducer, error) { func NewKafkaProducer(brokers []string) (sarama.AsyncProducer, error) {
kafkaConfig := sarama.NewConfig() kafkaConfig := sarama.NewConfig()
kafkaConfig.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack kafkaConfig.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
kafkaConfig.Producer.Compression = sarama.CompressionSnappy // Compress messages kafkaConfig.Producer.Compression = sarama.CompressionSnappy // Compress messages
kafkaConfig.Producer.Return.Errors = false
kafkaConfig.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms kafkaConfig.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
kafkaConfig.Net.ResolveCanonicalBootstrapServers = false kafkaConfig.Net.ResolveCanonicalBootstrapServers = false
...@@ -23,8 +23,8 @@ func NewKafkaProducer(brokers []string) (sarama.AsyncProducer, error) { ...@@ -23,8 +23,8 @@ func NewKafkaProducer(brokers []string) (sarama.AsyncProducer, error) {
} }
go func() { go func() {
for _ = range producer.Errors() { for ierr := range producer.Errors() {
//log.Printf("Failed to send log entry to kafka: %v\n", err) fmt.Printf("Failed to send log entry to kafka: %v\n", ierr)
} }
}() }()
...@@ -33,8 +33,8 @@ func NewKafkaProducer(brokers []string) (sarama.AsyncProducer, error) { ...@@ -33,8 +33,8 @@ func NewKafkaProducer(brokers []string) (sarama.AsyncProducer, error) {
func FireTaskReceipt(producer sarama.AsyncProducer, receipt *odysseus.TaskReceipt, topic string) error { func FireTaskReceipt(producer sarama.AsyncProducer, receipt *odysseus.TaskReceipt, topic string) error {
// Check time for partition key // Check time for partition key
var partitionKey sarama.StringEncoder //var partitionKey sarama.StringEncoder
partitionKey = sarama.StringEncoder(time.Now().Format("2006-01-02")) //partitionKey = sarama.StringEncoder(time.Now().Format("2006-01-02"))
b, err := proto.Marshal(receipt) b, err := proto.Marshal(receipt)
if err != nil { if err != nil {
...@@ -43,7 +43,7 @@ func FireTaskReceipt(producer sarama.AsyncProducer, receipt *odysseus.TaskReceip ...@@ -43,7 +43,7 @@ func FireTaskReceipt(producer sarama.AsyncProducer, receipt *odysseus.TaskReceip
value := sarama.ByteEncoder(b) value := sarama.ByteEncoder(b)
producer.Input() <- &sarama.ProducerMessage{ producer.Input() <- &sarama.ProducerMessage{
Key: partitionKey, //Key: partitionKey,
Topic: topic, Topic: topic,
Value: value, Value: value,
} }
......
package utils
import (
"github.com/google/uuid"
basev1 "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
"strings"
"testing"
"time"
)
func Test(t *testing.T) {
broker := "192.168.1.220:9092"
brokers := strings.Split(broker, ";")
producer, err := NewKafkaProducer(brokers)
if err != nil {
t.Fatalf("NewKafkaProducer failed with err:%s", err.Error())
}
for i := 0; i < 100; i++ {
taskReceipt := &basev1.TaskReceipt{
TaskUid: uuid.NewString(),
}
err = FireTaskReceipt(producer, taskReceipt, "taskreceipt")
if err != nil {
t.Fatalf("FireTaskReceipt failed with err:%s", err.Error())
}
time.Sleep(time.Second)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment