Commit 076a0de5 authored by Your Name's avatar Your Name

use slog

parent 7cace2e1
module github.com/odysseus/go-kafka module github.com/odysseus/go-kafka
go 1.21.3 go 1.21.7
replace github.com/odysseus/odysseus-protocol => ../odysseus-protocol replace github.com/odysseus/odysseus-protocol => ../odysseus-protocol
......
...@@ -5,10 +5,10 @@ import ( ...@@ -5,10 +5,10 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"log" "log/slog"
"math/rand"
"net/http" "net/http"
"net/url" "net/url"
"os"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
...@@ -72,8 +72,9 @@ func kafkaProducerBytes() { ...@@ -72,8 +72,9 @@ func kafkaProducerBytes() {
// Create a new Kafka producer using the specified configuration and broker addresses. // Create a new Kafka producer using the specified configuration and broker addresses.
producer, err := sarama.NewAsyncProducer(kafkaBrokers, config) producer, err := sarama.NewAsyncProducer(kafkaBrokers, config)
if err != nil { if err != nil {
log.Fatal("Failed to start Kafka producer:", err) panic(fmt.Sprint("Failed to start Kafka producer:", err))
} }
// Ensure the Kafka producer is closed when the function ends (deferred execution). // Ensure the Kafka producer is closed when the function ends (deferred execution).
...@@ -107,8 +108,9 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUps ...@@ -107,8 +108,9 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUps
// Create a new Kafka consumer using the specified configuration and broker addresses. // Create a new Kafka consumer using the specified configuration and broker addresses.
consumer, err := sarama.NewConsumer(kafkaBrokers, config) consumer, err := sarama.NewConsumer(kafkaBrokers, config)
if err != nil { if err != nil {
log.Fatal("Failed to start Kafka consumer:", err) panic(fmt.Sprint("Failed to start Kafka consumer:", err))
} }
// Ensure the Kafka consumer is closed when the function ends (deferred execution). // Ensure the Kafka consumer is closed when the function ends (deferred execution).
...@@ -117,8 +119,9 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUps ...@@ -117,8 +119,9 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUps
// Create a partition consumer for the specified topic, partition, and starting offset. // Create a partition consumer for the specified topic, partition, and starting offset.
// The starting offset is set to sarama.OffsetNewest, which means the consumer will start consuming messages from the latest available offset. // The starting offset is set to sarama.OffsetNewest, which means the consumer will start consuming messages from the latest available offset.
reqConsumer, err := consumer.ConsumePartition(req, 0, sarama.OffsetNewest) reqConsumer, err := consumer.ConsumePartition(req, 0, sarama.OffsetNewest)
if err != nil { if err != nil {
log.Fatal("Failed to start partition consumer:", err) panic(fmt.Sprint("Failed to start partition consumer:", err))
} }
// Ensure the partition consumer is closed when the function ends (deferred execution). // Ensure the partition consumer is closed when the function ends (deferred execution).
...@@ -129,23 +132,23 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUps ...@@ -129,23 +132,23 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUps
case message := <-reqConsumer.Messages(): case message := <-reqConsumer.Messages():
if message == nil { if message == nil {
fmt.Println("reqConsumer message is nil ") slog.Error("kafka consumer", "topic", req, "message", "is nil")
continue continue
} }
if message.Value == nil { if message.Value == nil {
fmt.Println("reqConsumer message.Value is nil ") slog.Error("kafka consumer", "topic", req, "message vaule", "is nil")
continue continue
} }
pbMsg := pbUpstream.TaskContent{} pbMsg := pbUpstream.TaskContent{}
if err := gogoPbProto.Unmarshal(message.Value, &pbMsg); err != nil { if err := gogoPbProto.Unmarshal(message.Value, &pbMsg); err != nil {
fmt.Println("kafkaConsumerBytes gogoPbProto.Unmarshal", err.Error()) slog.Error("kafka consumer", "topic", req, "gogoPbProto.Unmarshal err", err.Error())
continue continue
} }
fmt.Println("reqConsumer msg", pbMsg.TaskId) slog.Info("kafka consumer", "topic", req, "pbMsg.TaskId", pbMsg.TaskId)
select { select {
case <-done: case <-done:
...@@ -164,7 +167,7 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUps ...@@ -164,7 +167,7 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUps
// Create a new Kafka consumer using the specified configuration and broker addresses. // Create a new Kafka consumer using the specified configuration and broker addresses.
consumer, err := sarama.NewConsumer(kafkaBrokers, config) consumer, err := sarama.NewConsumer(kafkaBrokers, config)
if err != nil { if err != nil {
log.Fatal("Failed to start Kafka consumer:", err) panic(fmt.Sprint("Failed to start Kafka consumer:", err))
} }
// Ensure the Kafka consumer is closed when the function ends (deferred execution). // Ensure the Kafka consumer is closed when the function ends (deferred execution).
...@@ -181,8 +184,9 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUps ...@@ -181,8 +184,9 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUps
// defer reqConsumer.Close() // defer reqConsumer.Close()
resConsumer, err := consumer.ConsumePartition(resTopic, 0, sarama.OffsetNewest) resConsumer, err := consumer.ConsumePartition(resTopic, 0, sarama.OffsetNewest)
if err != nil { if err != nil {
log.Fatal("Failed to start partition consumer:", err) panic(fmt.Sprint("Failed to start partition consumer:", err))
} }
// Ensure the partition consumer is closed when the function ends (deferred execution). // Ensure the partition consumer is closed when the function ends (deferred execution).
...@@ -199,22 +203,23 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUps ...@@ -199,22 +203,23 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUps
case message := <-resConsumer.Messages(): case message := <-resConsumer.Messages():
if message == nil { if message == nil {
fmt.Println("resConsumer message is nil ") slog.Error("kafka consumer", "topic", resTopic, "message", "is nil")
continue continue
} }
if message.Value == nil { if message.Value == nil {
fmt.Println("resConsumer message.Value is nil ") slog.Error("kafka consumer", "topic", resTopic, "message vaule", "is nil")
continue continue
} }
pbMsg := pbUpstream.TaskReceipt{} pbMsg := pbUpstream.TaskReceipt{}
if err := gogoPbProto.Unmarshal(message.Value, &pbMsg); err != nil { if err := gogoPbProto.Unmarshal(message.Value, &pbMsg); err != nil {
fmt.Println("kafkaConsumerBytes gogoPbProto.Unmarshal", err.Error()) slog.Error("kafka consumer", "topic", resTopic, "gogoPbProto.Unmarshal err", err.Error())
continue continue
} }
fmt.Println("resConsumer msg", pbMsg.TaskId, "len(resOutStream)", len(resOutStream)) slog.Info("kafka consumer", "topic", resTopic, "pbMsg.TaskId", pbMsg.TaskId)
select { select {
case <-done: case <-done:
...@@ -242,7 +247,7 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent ...@@ -242,7 +247,7 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
sender, err := qdb.NewLineSender(ctx, addrOpt) sender, err := qdb.NewLineSender(ctx, addrOpt)
if err != nil { if err != nil {
log.Fatal(err) panic(err)
} }
// Make sure to close the sender on exit to release resources. // Make sure to close the sender on exit to release resources.
defer sender.Close() defer sender.Close()
...@@ -254,7 +259,7 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent ...@@ -254,7 +259,7 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
nanoseconds := int64(task.TaskTimestamp) nanoseconds := int64(task.TaskTimestamp)
seconds := nanoseconds / 1e9 seconds := nanoseconds / 1e9
fmt.Println("questdb <- resStream", "task.TaskId", task.TaskId, "task.TaskUid", task.TaskUid) slog.Debug("questdb <- resStream", "task.TaskId", task.TaskId, "task.TaskUid", task.TaskUid)
sender.Table(resTableName). sender.Table(resTableName).
Symbol("type", fmt.Sprintf("%d", task.TaskType)). Symbol("type", fmt.Sprintf("%d", task.TaskType)).
...@@ -274,8 +279,7 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent ...@@ -274,8 +279,7 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
err = sender.Flush(ctx) err = sender.Flush(ctx)
if err != nil { if err != nil {
slog.Error("task := <-resStream", "error", err.Error())
fmt.Println("task := <-resStream sender.Flush", err.Error())
//log.Fatal(err) //log.Fatal(err)
} }
...@@ -294,11 +298,11 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent ...@@ -294,11 +298,11 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
_ = task _ = task
taskFeeAsInt, err := strconv.Atoi(task.TaskFee) taskFeeAsInt, err := strconv.Atoi(task.TaskFee)
if err != nil { if err != nil {
fmt.Printf("task.TaskFee string to int error: %v\n", err) slog.Error("task.TaskFee string to int", "error", err)
continue continue
} }
fmt.Println("questdb <- reqStream", "task.TaskId", task.TaskId, "task.TaskUid", task.TaskUid) slog.Debug("questdb <- reqStream", "task.TaskId", task.TaskId, "task.TaskUid", task.TaskUid)
nanoseconds := int64(task.TaskTimestamp) nanoseconds := int64(task.TaskTimestamp)
seconds := nanoseconds / 1e9 seconds := nanoseconds / 1e9
...@@ -319,7 +323,8 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent ...@@ -319,7 +323,8 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
err = sender.Flush(ctx) err = sender.Flush(ctx)
if err != nil { if err != nil {
fmt.Println("task := <-reqStream sender.Flush", err.Error()) slog.Error("task := <-reqStream", "error", err.Error())
//fmt.Println("task := <-reqStream sender.Flush", err.Error())
//log.Fatal(err) //log.Fatal(err)
} }
...@@ -342,9 +347,6 @@ type ResponseJson struct { ...@@ -342,9 +347,6 @@ type ResponseJson struct {
func newCache(redisAddr, redisPass, mysqlIP, dbName, user, passwd string, port int) *cachedata.CacheData { func newCache(redisAddr, redisPass, mysqlIP, dbName, user, passwd string, port int) *cachedata.CacheData {
fmt.Println("mysqlIP-------------", mysqlIP)
fmt.Println("redisAddr-------", redisAddr)
_cache := cachedata.NewCacheData(context.Background(), cachedata.RedisConnParam{ _cache := cachedata.NewCacheData(context.Background(), cachedata.RedisConnParam{
//Addr: "192.168.1.10:6379", //Addr: "192.168.1.10:6379",
Addr: redisAddr, Addr: redisAddr,
...@@ -413,7 +415,8 @@ func createTable(ipAddr, tableSql string) { ...@@ -413,7 +415,8 @@ func createTable(ipAddr, tableSql string) {
body, err := io.ReadAll(res.Body) body, err := io.ReadAll(res.Body)
checkErr(err) checkErr(err)
log.Println(string(body)) slog.Info("createTable", "response body", body)
} }
func checkErr(err error) { func checkErr(err error) {
...@@ -424,6 +427,11 @@ func checkErr(err error) { ...@@ -424,6 +427,11 @@ func checkErr(err error) {
func main() { func main() {
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
Level: slog.LevelDebug,
//AddSource: true,
})))
var questAddr, kafkaBroker, callbackAddrP, listenIpPort, aigcProduceTopic, aigcConsumerTopic string var questAddr, kafkaBroker, callbackAddrP, listenIpPort, aigcProduceTopic, aigcConsumerTopic string
var redisAddr, redisPassWd, mysqlAddr string var redisAddr, redisPassWd, mysqlAddr string
...@@ -451,20 +459,14 @@ func main() { ...@@ -451,20 +459,14 @@ func main() {
flag.Parse() flag.Parse()
fmt.Println("questAddr|", questAddr) slog.Warn("start param", "quest", slog.GroupValue(slog.String("Addr", questAddr)))
fmt.Println("kafkaBroker|", kafkaBroker) slog.Warn("start param", "kafka", slog.GroupValue(slog.String("Addr", kafkaBroker)), slog.Group("topic", "aigcProduceTopic", aigcProduceTopic, "aigcConsumerTopic", aigcConsumerTopic))
fmt.Println("callbackIpAddr|", callbackAddrP) slog.Info("start param", "callback", slog.GroupValue(slog.String("Addr", callbackAddrP)))
fmt.Println("listenIpPort|", listenIpPort)
fmt.Println("aigcProduceTopic|", aigcProduceTopic) slog.Warn("start param", "listenIpPort", slog.GroupValue(slog.String("Addr", listenIpPort)))
fmt.Println("aigcConsumerTopic|", aigcConsumerTopic) slog.Warn("start param", "redis", slog.GroupValue(slog.String("Addr", redisAddr), slog.String("PassWd", redisPassWd)))
slog.Warn("start param", "mysql", slog.GroupValue(slog.String("Addr", mysqlAddr),
fmt.Println("redisAddr|", redisAddr) slog.Int("port", mysqlPort), slog.String("DbName", mysqlDbName), slog.String("User", mysqlUser), slog.String("PassWd", mysqlPassWd))) //, slog.GroupValue(slog.Int("port",mysqlPort),slog.String("mysqlDbName",mysqlDbName),slog.GroupValue(slog.String("mysqlUser",mysqlUser)))))
fmt.Println("redisPassWd|", redisPassWd)
fmt.Println("mysqlAddr|", mysqlAddr)
fmt.Println("mysqlPort|", mysqlPort)
fmt.Println("mysqlDbName|", mysqlDbName)
fmt.Println("mysqlUser|", mysqlUser)
fmt.Println("mysqlPassWd|", mysqlPassWd)
kafkaBrokers = []string{kafkaBroker} kafkaBrokers = []string{kafkaBroker}
callbackAddr = callbackAddrP callbackAddr = callbackAddrP
...@@ -472,8 +474,7 @@ func main() { ...@@ -472,8 +474,7 @@ func main() {
idx := strings.Index(questAddr, ":") idx := strings.Index(questAddr, ":")
if idx == -1 { if idx == -1 {
fmt.Println("please use the format: 1.1.1.1:9009") panic("quest addr parameter,please use the format: 1.1.1.1:9009")
return
} }
createTable(questAddr[:idx], createTaskTableSql) createTable(questAddr[:idx], createTaskTableSql)
...@@ -528,27 +529,44 @@ func main() { ...@@ -528,27 +529,44 @@ func main() {
callbackGroupV1 := callbackGroup.Group("/v1") callbackGroupV1 := callbackGroup.Group("/v1")
callbackGroupV1.Post("/", func(c *fiber.Ctx) error { callbackGroupV1.Post("/", func(c *fiber.Ctx) error {
fmt.Println("c.Route().Path", c.Route().Path) slog.Debug("callback", "path", c.Route().Path)
//fmt.Println("c.Route().Path", c.Route().Path)
body := c.Body() body := c.Body()
var resbody pbUpstream.TaskResponse var resbody pbUpstream.TaskResponse
if err := gogoPbProto.Unmarshal(body, &resbody); err != nil { if err := gogoPbProto.Unmarshal(body, &resbody); err != nil {
return c.SendString(fmt.Sprintf("Unmarshal error %v", err.Error())) return c.SendString(fmt.Sprintf("callback Unmarshal error %v", err.Error()))
} }
slog.Debug("callback", "task-id", resbody.TaskId, "result", resbody.TaskIsSucceed, "uid", resbody.TaskUid)
res(resbody) res(resbody)
return c.SendStatus(200) return c.SendStatus(200)
}) })
log.Fatal(app.Listen(listenIpPort)) var taskJsonStr = `{"id":10,"desc":"测试新增","price":1000,"complexity":1,"hardware_require":"{ disk_size: 100, memory_size: 100 }","image_id":"13221312","image_url":"dasdasdasd","cmd":"{\"image_name\":\"demianhjw/aigic:0129\",\"docker_cmd\":{\"container_port\":\"5001\"},\"api_url\":\"http://127.0.0.1:%d/aigic\"}","workload":100,"api_path":"/demianhjw/aigic/0129","image_name":"demianhjw/aigic:0129","sign_url":"dsdsdsds","username":"sdsds","password":"sdsdsd","created_time":"2024-02-02T03:13:33+08:00","updated_time":"2024-02-02T03:13:33+08:00","deleted":0}`
task := model.TaskType{}
if err := json.Unmarshal([]byte(taskJsonStr), &task); err != nil {
panic(fmt.Sprintf("json.Unmarshal error, %s", err.Error()))
}
replanceQueryTask = &task
panic(app.Listen(listenIpPort))
} }
var replanceQueryTask *model.TaskType
func ApiAndJWT(c *fiber.Ctx) error { func ApiAndJWT(c *fiber.Ctx) error {
fmt.Println("c.Route().Path", c.Route().Path) slog.Debug("ApiAndJWT", "path", c.Route().Path)
routePath := c.Route().Path routePath := c.Route().Path
routePathWithoutStar := strings.TrimSuffix(routePath, "/*") routePathWithoutStar := strings.TrimSuffix(routePath, "/*")
...@@ -560,10 +578,6 @@ func ApiAndJWT(c *fiber.Ctx) error { ...@@ -560,10 +578,6 @@ func ApiAndJWT(c *fiber.Ctx) error {
uid := reqHeaders["X-Consumer-Custom-Id"] uid := reqHeaders["X-Consumer-Custom-Id"]
if sync, ok := reqHeaders["sync"]; ok {
fmt.Println("sync-----------------sync", sync)
}
if uid == nil { if uid == nil {
return c.SendString(fmt.Sprintf("uid can not be nil")) return c.SendString(fmt.Sprintf("uid can not be nil"))
} }
...@@ -574,22 +588,29 @@ func ApiAndJWT(c *fiber.Ctx) error { ...@@ -574,22 +588,29 @@ func ApiAndJWT(c *fiber.Ctx) error {
uidAsInt, err := strconv.Atoi(uid[0]) uidAsInt, err := strconv.Atoi(uid[0])
fmt.Println("req pathInDB, int64(uidAsInt)", pathInDB, int64(uidAsInt)) slog.Info("query db param", "pathInDB", pathInDB, "uid", int64(uidAsInt))
task, err := cache.Query(pathInDB, int64(uidAsInt)) var task *model.TaskType
if true {
task = replanceQueryTask
} else {
var err error
task, err = cache.Query(pathInDB, int64(uidAsInt))
if err != nil { if err != nil {
return c.SendString(fmt.Sprintf("cache.Query %v", err.Error())) return c.SendString(fmt.Sprintf("cache.Query %v", err.Error()))
} }
}
var cmd string //var cmd string
cmd = task.Cmd //cmd = task.Cmd
pbMsg := pbUpstream.TaskContent{ pbMsg := pbUpstream.TaskContent{
TaskId: reqHeaders["Task-Id"][0], TaskId: reqHeaders["Task-Id"][0],
TaskType: uint64(task.ID), TaskType: uint64(task.ID),
TaskKind: 1, TaskKind: 1,
TaskCmd: cmd, TaskCmd: task.Cmd,
TaskParam: c.Body(), //[]byte(reqHeaders["Task-Id"][0]), TaskParam: c.Body(), //[]byte(reqHeaders["Task-Id"][0]),
TaskTimestamp: uint64(time.Now().UnixNano()), TaskTimestamp: uint64(time.Now().UnixNano()),
TaskCallback: "http://" + callbackAddr + "/callback/v1", //"http://192.168.1.10:6001/callback/v1", TaskCallback: "http://" + callbackAddr + "/callback/v1", //"http://192.168.1.10:6001/callback/v1",
...@@ -603,16 +624,16 @@ func ApiAndJWT(c *fiber.Ctx) error { ...@@ -603,16 +624,16 @@ func ApiAndJWT(c *fiber.Ctx) error {
msgAsJson, err := json.Marshal(pbMsg) msgAsJson, err := json.Marshal(pbMsg)
if err != nil { if err != nil {
fmt.Println("json.Marshal error", err.Error()) slog.Error("pbUpstream.TaskContent", "Marshal", err.Error())
return c.SendString(fmt.Sprintf("json.Marshal %v", err.Error())) return c.SendString(fmt.Sprintf("json.Marshal %v", err.Error()))
} }
fmt.Println("msgAsJson to kafka", string(msgAsJson)) slog.Debug("msgAsJson to kafka", "asjson", string(msgAsJson))
pbBytes, err := gogoPbProto.Marshal(&pbMsg) pbBytes, err := gogoPbProto.Marshal(&pbMsg)
if err != nil { if err != nil {
fmt.Println("pb error", err.Error()) slog.Error("pbUpstream.TaskContent", "gogoPbProto.Marshal", err.Error())
return c.SendString(fmt.Sprintf("pb error: %v", err.Error())) return c.SendString(fmt.Sprintf("pb error: %v", err.Error()))
} }
...@@ -620,40 +641,33 @@ func ApiAndJWT(c *fiber.Ctx) error { ...@@ -620,40 +641,33 @@ func ApiAndJWT(c *fiber.Ctx) error {
Bytes: pbBytes, Bytes: pbBytes,
} }
fmt.Println("pbMsg.TaskUid--------------", pbMsg.TaskId) //fmt.Println("pbMsg.TaskUid--------------", pbMsg.TaskId)
wait := req(pbMsg.TaskId) asyncMode := false
resAsPb := <-wait
fmt.Println("resAsPb.TaskResultHeader", string(resAsPb.TaskResultHeader), len(resAsPb.TaskResultHeader))
if resAsPb.TaskResultHeader != nil { if headerSync, ok := reqHeaders["Async"]; ok {
if len(resAsPb.TaskResultHeader) != 0 { //fmt.Println("sync-----------------sync", headerSync)
for _, syncAsString := range headerSync {
headers := make(map[string][]string) if syncAsString == "true" {
asyncMode = true
if err := json.Unmarshal(resAsPb.TaskResultHeader, &headers); err != nil { break
}
fmt.Println("json.Unmarshal(resAsPb.TaskResultHeader--------------err", err.Error()) }
return c.SendString(fmt.Sprintf("json.Unmarshal(resAsPb.TaskResultHeader--------------err %v", err.Error()))
} }
for k, vs := range headers { slog.Info("task info", "taskid", pbMsg.TaskId, "asyncMode", asyncMode)
for _, v := range vs {
if k == "Content-Encoding" { //fmt.Println("asyncMode-----------", asyncMode)
c.Response().Header.SetContentEncoding(v)
} if asyncMode {
time.Sleep(10 * time.Second)
return c.SendString(pbMsg.TaskId)
} else {
return syncModeF(c, pbMsg.TaskId)
if k == "Content-Type" {
c.Response().Header.SetContentType(v)
}
}
}
}
} }
//fmt.Println("syncMode-------------", syncMode)
// resAsJson := ResponseJson{ // resAsJson := ResponseJson{
// TaskId: resAsPb.TaskId, // TaskId: resAsPb.TaskId,
// TaskResult: resAsPb.GetTaskResultBody(), // TaskResult: resAsPb.GetTaskResultBody(),
...@@ -663,97 +677,135 @@ func ApiAndJWT(c *fiber.Ctx) error { ...@@ -663,97 +677,135 @@ func ApiAndJWT(c *fiber.Ctx) error {
// TaskError: resAsPb.TaskError, // TaskError: resAsPb.TaskError,
// } // }
return c.Send(resAsPb.GetTaskResultBody()) //return c.Send(resAsPb.GetTaskResultBody())
//return c.JSON(resAsJson) //return c.JSON(resAsJson)
} }
///////////////////////////// func syncModeF(c *fiber.Ctx, taskid string) error {
//
//
////////////////////////////
func kafkaProducer() {
// Create a new Sarama configuration for the Kafka producer. wait := req(taskid)
config := sarama.NewConfig() resAsPb := <-wait
// Create a new Kafka producer using the specified configuration and broker addresses. slog.Debug("resAsPb.TaskResultHeader", "resAsPb.TaskResultHeader", resAsPb.TaskResultHeader)
producer, err := sarama.NewAsyncProducer(kafkaBrokers, config)
if err != nil {
log.Fatal("Failed to start Kafka producer:", err)
}
// Ensure the Kafka producer is closed when the function ends (deferred execution). //fmt.Println("resAsPb.TaskResultHeader", string(resAsPb.TaskResultHeader), len(resAsPb.TaskResultHeader))
defer producer.Close()
for message := range producerMessages { if resAsPb.TaskResultHeader != nil {
counterStr := fmt.Sprintf("%d", counter) if len(resAsPb.TaskResultHeader) != 0 {
// Get the Indian Standard Time (IST) location headers := make(map[string][]string)
istLocation, err := time.LoadLocation("Asia/Kolkata")
if err != nil {
log.Fatal("Failed to load IST location:", err)
}
// Convert current time to IST if err := json.Unmarshal(resAsPb.TaskResultHeader, &headers); err != nil {
istTime := time.Now().In(istLocation).Format("02-01-2006 15:04:05") slog.Error("syncModeF", "json.Unmarshal(resAsPb.TaskResultHeader", err.Error())
value := fmt.Sprintf("(%s, %s, %s)", counterStr, istTime, message) return c.SendString(fmt.Sprintf("json.Unmarshal(resAsPb.TaskResultHeader error: %v", err.Error()))
producer.Input() <- &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(value),
} }
fmt.Printf("Message sent: %s\n", value) for k, vs := range headers {
counter++ for _, v := range vs {
// Introduce random delay between 1 to 3 seconds for message push if k == "Content-Encoding" {
time.Sleep(time.Duration(1+rand.Intn(3)) * time.Second) c.Response().Header.SetContentEncoding(v)
} }
}
func kafkaConsumer(wg *sync.WaitGroup) {
// Create a new Sarama configuration for the Kafka producer. if k == "Content-Type" {
config := sarama.NewConfig() c.Response().Header.SetContentType(v)
// Create a new Kafka consumer using the specified configuration and broker addresses.
consumer, err := sarama.NewConsumer(kafkaBrokers, config)
if err != nil {
log.Fatal("Failed to start Kafka consumer:", err)
} }
// Ensure the Kafka consumer is closed when the function ends (deferred execution).
defer consumer.Close()
// Create a partition consumer for the specified topic, partition, and starting offset.
// The starting offset is set to sarama.OffsetNewest, which means the consumer will start consuming messages from the latest available offset.
partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest)
if err != nil {
log.Fatal("Failed to start partition consumer:", err)
} }
// Ensure the partition consumer is closed when the function ends (deferred execution).
defer partitionConsumer.Close()
// Signal that the consumer goroutine is ready
defer wg.Done()
// Infinite loop to continuously listen for messages from the partitionConsumer.Messages() channel.
for {
select {
case message := <-partitionConsumer.Messages():
value := string(message.Value)
fmt.Printf("Received message from Kafka: %s\n", value)
// Acquire the mutex before appending to the messages slice to ensure concurrent-safe access.
mutex.Lock()
// Append the received message to the internal messages slice.
messages = append(messages, value)
// Release the mutex.
mutex.Unlock()
// Send the received message to the /consumer endpoint via the consumerMessages channel.
consumerMessages <- value
} }
} }
}
return c.Send(resAsPb.GetTaskResultBody())
} }
/////////////////////////////
//
//
////////////////////////////
// func kafkaProducer() {
// // Create a new Sarama configuration for the Kafka producer.
// config := sarama.NewConfig()
// // Create a new Kafka producer using the specified configuration and broker addresses.
// producer, err := sarama.NewAsyncProducer(kafkaBrokers, config)
// if err != nil {
// panic(fmt.Sprint("Failed to start Kafka producer:", err))
// }
// // Ensure the Kafka producer is closed when the function ends (deferred execution).
// defer producer.Close()
// for message := range producerMessages {
// counterStr := fmt.Sprintf("%d", counter)
// // Get the Indian Standard Time (IST) location
// istLocation, err := time.LoadLocation("Asia/Kolkata")
// if err != nil {
// slog.Error("message := range producerMessages", "Failed to load IST location:", err)
// continue
// }
// // Convert current time to IST
// istTime := time.Now().In(istLocation).Format("02-01-2006 15:04:05")
// value := fmt.Sprintf("(%s, %s, %s)", counterStr, istTime, message)
// producer.Input() <- &sarama.ProducerMessage{
// Topic: topic,
// Value: sarama.StringEncoder(value),
// }
// slog.Debug("send to kafka", "msg", string(value))
// counter++
// // Introduce random delay between 1 to 3 seconds for message push
// //time.Sleep(time.Duration(1+rand.Intn(3)) * time.Second)
// }
// }
// func kafkaConsumer(wg *sync.WaitGroup) {
// // Create a new Sarama configuration for the Kafka producer.
// config := sarama.NewConfig()
// // Create a new Kafka consumer using the specified configuration and broker addresses.
// consumer, err := sarama.NewConsumer(kafkaBrokers, config)
// if err != nil {
// log.Fatal("Failed to start Kafka consumer:", err)
// }
// // Ensure the Kafka consumer is closed when the function ends (deferred execution).
// defer consumer.Close()
// // Create a partition consumer for the specified topic, partition, and starting offset.
// // The starting offset is set to sarama.OffsetNewest, which means the consumer will start consuming messages from the latest available offset.
// partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest)
// if err != nil {
// log.Fatal("Failed to start partition consumer:", err)
// }
// // Ensure the partition consumer is closed when the function ends (deferred execution).
// defer partitionConsumer.Close()
// // Signal that the consumer goroutine is ready
// defer wg.Done()
// // Infinite loop to continuously listen for messages from the partitionConsumer.Messages() channel.
// for {
// select {
// case message := <-partitionConsumer.Messages():
// value := string(message.Value)
// fmt.Printf("Received message from Kafka: %s\n", value)
// // Acquire the mutex before appending to the messages slice to ensure concurrent-safe access.
// mutex.Lock()
// // Append the received message to the internal messages slice.
// messages = append(messages, value)
// // Release the mutex.
// mutex.Unlock()
// // Send the received message to the /consumer endpoint via the consumerMessages channel.
// consumerMessages <- value
// }
// }
// }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment