Commit cd9b85b1 authored by vicotor's avatar vicotor

update code

parent 115c39f2
...@@ -83,7 +83,7 @@ func (n *Node) Loop(idx int) { ...@@ -83,7 +83,7 @@ func (n *Node) Loop(idx int) {
postReceipt := func(task *odysseus.TaskContent, result *odysseus.TaskResponse, err error) error { postReceipt := func(task *odysseus.TaskContent, result *odysseus.TaskResponse, err error) error {
receipt := new(odysseus.TaskReceipt) receipt := new(odysseus.TaskReceipt)
receipt.TaskUuid = task.TaskUuid receipt.TaskUuid = task.TaskUuid
receipt.TaskFinishTime = uint64(time.Now().Unix()) receipt.TaskFinishTime = uint64(time.Now().UnixNano())
receipt.TaskId = task.TaskId receipt.TaskId = task.TaskId
receipt.TaskUid = task.TaskUid receipt.TaskUid = task.TaskUid
receipt.TaskWorkload = task.TaskWorkload receipt.TaskWorkload = task.TaskWorkload
......
package utils package utils
import ( import (
"fmt"
"github.com/IBM/sarama" "github.com/IBM/sarama"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1" odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
"time" log "github.com/sirupsen/logrus"
) )
// NewKafkaProducer Create a new KafkaProducer. // NewKafkaProducer Create a new KafkaProducer.
func NewKafkaProducer(brokers []string) (sarama.AsyncProducer, error) { func NewKafkaProducer(brokers []string) (sarama.AsyncProducer, error) {
kafkaConfig := sarama.NewConfig() kafkaConfig := sarama.NewConfig()
kafkaConfig.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack //kafkaConfig.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
kafkaConfig.Producer.Compression = sarama.CompressionSnappy // Compress messages //kafkaConfig.Producer.Compression = sarama.CompressionSnappy // Compress messages
kafkaConfig.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms //kafkaConfig.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
kafkaConfig.Net.ResolveCanonicalBootstrapServers = false //kafkaConfig.Net.ResolveCanonicalBootstrapServers = false
producer, err := sarama.NewAsyncProducer(brokers, kafkaConfig) producer, err := sarama.NewAsyncProducer(brokers, kafkaConfig)
...@@ -24,7 +23,7 @@ func NewKafkaProducer(brokers []string) (sarama.AsyncProducer, error) { ...@@ -24,7 +23,7 @@ func NewKafkaProducer(brokers []string) (sarama.AsyncProducer, error) {
go func() { go func() {
for ierr := range producer.Errors() { for ierr := range producer.Errors() {
fmt.Printf("Failed to send log entry to kafka: %v\n", ierr) log.WithError(ierr).Debug("Failed to send log entry to kafka")
} }
}() }()
......
package utils package utils
import ( import (
"fmt"
"github.com/google/uuid" "github.com/google/uuid"
basev1 "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1" basev1 "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
"strings" "strings"
...@@ -24,6 +25,7 @@ func Test(t *testing.T) { ...@@ -24,6 +25,7 @@ func Test(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("FireTaskReceipt failed with err:%s", err.Error()) t.Fatalf("FireTaskReceipt failed with err:%s", err.Error())
} }
fmt.Println("send task receipt success")
time.Sleep(time.Second) time.Sleep(time.Second)
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment