Commit 990eb14b authored by vicotor's avatar vicotor

add new kafka for post receipt

parent f01436fb
...@@ -67,7 +67,7 @@ func produceTask() { ...@@ -67,7 +67,7 @@ func produceTask() {
select { select {
case <-ticker.C: case <-ticker.C:
task := makeTask() task := makeTask()
if err := Fire(producer, task, config.GetConfig().Kafka.Topic); err != nil { if err := Fire(producer, task, config.GetConfig().Kafka.TaskTopic); err != nil {
log.WithError(err).Error("fire task failed") log.WithError(err).Error("fire task failed")
} }
} }
......
...@@ -8,6 +8,8 @@ password="123456" ...@@ -8,6 +8,8 @@ password="123456"
db=0 db=0
[kafka] [kafka]
#brokers="192.168.1.108:9092"
#brokers="192.168.1.220:9092" #brokers="192.168.1.220:9092"
brokers="192.168.1.108:9092" brokers="127.0.0.1:9092"
topic="pbaigc" task_topic="pbaigc"
receipt_topic="taskreceipt"
...@@ -12,8 +12,9 @@ type RedisConfig struct { ...@@ -12,8 +12,9 @@ type RedisConfig struct {
DbIndex int `json:"db_index" toml:"db_index"` DbIndex int `json:"db_index" toml:"db_index"`
} }
type KafkaConfig struct { type KafkaConfig struct {
Brokers string `json:"brokers" toml:"brokers"` Brokers string `json:"brokers" toml:"brokers"`
Topic string `json:"topic" toml:"topic"` ReceiptTopic string `json:"receipt_topic" toml:"receipt_topic"`
TaskTopic string `json:"task_topic" toml:"task_topic"`
} }
type Config struct { type Config struct {
......
...@@ -12,13 +12,15 @@ import ( ...@@ -12,13 +12,15 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"strings" "strings"
"sync" "sync"
"time"
) )
type Node struct { type Node struct {
rdb *redis.Client rdb *redis.Client
quit chan struct{} quit chan struct{}
conf *config.Config conf *config.Config
wg sync.WaitGroup kafkaProducer sarama.AsyncProducer
wg sync.WaitGroup
} }
func NewNode() *Node { func NewNode() *Node {
...@@ -33,6 +35,8 @@ func NewNode() *Node { ...@@ -33,6 +35,8 @@ func NewNode() *Node {
quit: make(chan struct{}), quit: make(chan struct{}),
conf: config.GetConfig(), conf: config.GetConfig(),
} }
brokers := strings.Split(node.conf.Kafka.Brokers, ";")
node.kafkaProducer, _ = utils.NewKafkaProducer(brokers)
return node return node
} }
...@@ -76,6 +80,29 @@ func (n *Node) Loop(idx int) { ...@@ -76,6 +80,29 @@ func (n *Node) Loop(idx int) {
} }
return err return err
} }
postReceipt := func(task *odysseus.TaskContent, result *odysseus.TaskResponse, err error) error {
receipt := new(odysseus.TaskReceipt)
receipt.TaskUuid = task.TaskUuid
receipt.TaskFinishTime = uint64(time.Now().Unix())
receipt.TaskId = task.TaskId
receipt.TaskUid = task.TaskUid
receipt.TaskWorkload = task.TaskWorkload
receipt.TaskDuration = int64(receipt.TaskFinishTime - task.TaskTimestamp)
receipt.TaskFee = 0
receipt.TaskOutLen = 0
receipt.TaskProfitAccount = ""
receipt.TaskWorkerAccount = ""
switch err {
case ErrNoWorker:
receipt.TaskResult = err.Error()
case ErrDispatchFailed:
receipt.TaskResult = err.Error()
default:
receipt.TaskResult = "internal error"
}
utils.FireTaskReceipt(n.kafkaProducer, receipt, config.GetConfig().Kafka.ReceiptTopic)
return nil
}
for { for {
select { select {
...@@ -96,6 +123,7 @@ func (n *Node) Loop(idx int) { ...@@ -96,6 +123,7 @@ func (n *Node) Loop(idx int) {
TaskIsSucceed: false, TaskIsSucceed: false,
TaskError: err.Error(), TaskError: err.Error(),
} }
postReceipt(task, result, err)
err = postResult(task, result) err = postResult(task, result)
if err != nil { if err != nil {
log.WithError(err).Error("post task result failed") log.WithError(err).Error("post task result failed")
...@@ -137,7 +165,7 @@ func (n *Node) attachKafkaConsumer(ctx context.Context, taskCh chan *odysseus.Ta ...@@ -137,7 +165,7 @@ func (n *Node) attachKafkaConsumer(ctx context.Context, taskCh chan *odysseus.Ta
} }
consumeFunc := func(consumer *Consumer) { consumeFunc := func(consumer *Consumer) {
topics := strings.Split(n.conf.Kafka.Topic, ";") topics := strings.Split(n.conf.Kafka.TaskTopic, ";")
for { for {
if err := client.Consume(ctx, topics, consumer); err != nil { if err := client.Consume(ctx, topics, consumer); err != nil {
if errors.Is(err, sarama.ErrClosedConsumerGroup) { if errors.Is(err, sarama.ErrClosedConsumerGroup) {
......
package utils
import (
"github.com/IBM/sarama"
"github.com/gogo/protobuf/proto"
odysseus "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
"time"
)
// NewKafkaProducer Create a new KafkaProducer.
func NewKafkaProducer(brokers []string) (sarama.AsyncProducer, error) {
kafkaConfig := sarama.NewConfig()
kafkaConfig.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
kafkaConfig.Producer.Compression = sarama.CompressionSnappy // Compress messages
kafkaConfig.Producer.Return.Errors = false
kafkaConfig.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
kafkaConfig.Net.ResolveCanonicalBootstrapServers = false
producer, err := sarama.NewAsyncProducer(brokers, kafkaConfig)
if err != nil {
return nil, err
}
go func() {
for _ = range producer.Errors() {
//log.Printf("Failed to send log entry to kafka: %v\n", err)
}
}()
return producer, nil
}
func FireTaskReceipt(producer sarama.AsyncProducer, receipt *odysseus.TaskReceipt, topic string) error {
// Check time for partition key
var partitionKey sarama.StringEncoder
partitionKey = sarama.StringEncoder(time.Now().Format("2006-01-02"))
b, err := proto.Marshal(receipt)
if err != nil {
return err
}
value := sarama.ByteEncoder(b)
producer.Input() <- &sarama.ProducerMessage{
Key: partitionKey,
Topic: topic,
Value: value,
}
return nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment