Commit 4cc6722a authored by Your Name's avatar Your Name

err sleep try

parent 2017526d
......@@ -70,11 +70,19 @@ func kafkaProducerBytes() {
// Create a new Sarama configuration for the Kafka producer.
config := sarama.NewConfig()
// Create a new Kafka producer using the specified configuration and broker addresses.
producer, err := sarama.NewAsyncProducer(kafkaBrokers, config)
var producer sarama.AsyncProducer
var err error
for {
// Create a new Kafka producer using the specified configuration and broker addresses.
producer, err = sarama.NewAsyncProducer(kafkaBrokers, config)
if err != nil {
panic(fmt.Sprint("Failed to start Kafka producer:", err))
slog.Error("sarama.NewAsyncProducer", "err", err.Error())
time.Sleep(time.Second * 3)
} else {
break
}
}
// Ensure the Kafka producer is closed when the function ends (deferred execution).
......@@ -93,12 +101,12 @@ func kafkaProducerBytes() {
// Introduce random delay between 1 to 3 seconds for message push
//time.Sleep(time.Duration(1+rand.Intn(3)) * time.Second)
}
}
func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUpstream.TaskContent, chan pbUpstream.TaskReceipt) {
reqOutStream := make(chan pbUpstream.TaskContent, 1000)
resOutStream := make(chan pbUpstream.TaskReceipt, 1000)
go func() {
......@@ -106,22 +114,36 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUps
// Create a new Sarama configuration for the Kafka producer.
config := sarama.NewConfig()
var consumer sarama.Consumer
var err error
for {
// Create a new Kafka consumer using the specified configuration and broker addresses.
consumer, err := sarama.NewConsumer(kafkaBrokers, config)
consumer, err = sarama.NewConsumer(kafkaBrokers, config)
if err != nil {
panic(fmt.Sprint("Failed to start Kafka consumer:", err))
slog.Error("kafkaConsumerBytes", "err", err.Error())
time.Sleep(time.Second * 3)
} else {
break
}
}
// Ensure the Kafka consumer is closed when the function ends (deferred execution).
defer consumer.Close()
var reqConsumer sarama.PartitionConsumer
for {
// Create a partition consumer for the specified topic, partition, and starting offset.
// The starting offset is set to sarama.OffsetNewest, which means the consumer will start consuming messages from the latest available offset.
reqConsumer, err := consumer.ConsumePartition(req, 0, sarama.OffsetNewest)
reqConsumer, err = consumer.ConsumePartition(req, 0, sarama.OffsetNewest)
if err != nil {
panic(fmt.Sprint("Failed to start partition consumer:", err))
slog.Error("consumer.ConsumePartition", "err", err.Error())
time.Sleep(time.Second * 3)
} else {
break
}
}
// Ensure the partition consumer is closed when the function ends (deferred execution).
......@@ -164,10 +186,19 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUps
// Create a new Sarama configuration for the Kafka producer.
config := sarama.NewConfig()
var consumer sarama.Consumer
var err error
for {
// Create a new Kafka consumer using the specified configuration and broker addresses.
consumer, err := sarama.NewConsumer(kafkaBrokers, config)
consumer, err = sarama.NewConsumer(kafkaBrokers, config)
if err != nil {
panic(fmt.Sprint("Failed to start Kafka consumer:", err))
slog.Error("kafkaConsumerBytes", "err", err.Error())
time.Sleep(time.Second * 3)
} else {
break
}
}
// Ensure the Kafka consumer is closed when the function ends (deferred execution).
......@@ -183,10 +214,23 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUps
// // Ensure the partition consumer is closed when the function ends (deferred execution).
// defer reqConsumer.Close()
resConsumer, err := consumer.ConsumePartition(resTopic, 0, sarama.OffsetNewest)
// resConsumer, err := consumer.ConsumePartition(resTopic, 0, sarama.OffsetNewest)
// if err != nil {
// panic(fmt.Sprint("Failed to start partition consumer:", err))
// }
var resConsumer sarama.PartitionConsumer
for {
// Create a partition consumer for the specified topic, partition, and starting offset.
// The starting offset is set to sarama.OffsetNewest, which means the consumer will start consuming messages from the latest available offset.
resConsumer, err = consumer.ConsumePartition(resTopic, 0, sarama.OffsetNewest)
if err != nil {
panic(fmt.Sprint("Failed to start partition consumer:", err))
slog.Error("consumer.ConsumePartition", "err", err.Error())
time.Sleep(time.Second * 3)
} else {
break
}
}
// Ensure the partition consumer is closed when the function ends (deferred execution).
......@@ -245,9 +289,17 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
//addrOpt := qdb.WithAddress("192.168.1.220:9009")
addrOpt := qdb.WithAddress(questAddr)
sender, err := qdb.NewLineSender(ctx, addrOpt)
var sender *qdb.LineSender
var err error
for {
sender, err = qdb.NewLineSender(ctx, addrOpt)
if err != nil {
panic(err)
slog.Error("qdb.NewLineSender", "err", err.Error())
time.Sleep(time.Second * 3)
} else {
break
}
}
// Make sure to close the sender on exit to release resources.
defer sender.Close()
......@@ -261,6 +313,8 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
slog.Debug("questdb <- resStream", "task.TaskId", task.TaskId, "task.TaskUid", task.TaskUid)
for {
sender.Table(resTableName).
Symbol("type", fmt.Sprintf("%d", task.TaskType)).
Symbol("uid", task.TaskUid).
......@@ -280,7 +334,11 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
err = sender.Flush(ctx)
if err != nil {
slog.Error("task := <-resStream", "error", err.Error())
//log.Fatal(err)
time.Sleep(time.Second * 3)
} else {
break
}
}
case task := <-reqStream:
......@@ -311,6 +369,8 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
// t := time.Unix(seconds, nanoseconds%1e9)
// time.Unix()
for {
sender.
Table(reqTableName).
Symbol("type", fmt.Sprintf("%d", task.TaskType)).
......@@ -324,15 +384,15 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
err = sender.Flush(ctx)
if err != nil {
slog.Error("task := <-reqStream", "error", err.Error())
//fmt.Println("task := <-reqStream sender.Flush", err.Error())
//log.Fatal(err)
time.Sleep(time.Second * 3)
} else {
break
}
}
case <-done:
return
}
}
}
......@@ -394,10 +454,12 @@ var createBillsTableSql string = `CREATE TABLE IF NOT EXISTS
result symbol CAPACITY 128 CACHE INDEX CAPACITY 8192
) timestamp (time) PARTITION BY DAY WAL;`
func createTable(ipAddr, tableSql string) {
func createTable(ipAddr, tableSql string) error {
//"http://localhost:9000"
u, err := url.Parse("http://" + ipAddr + ":9000")
checkErr(err)
if err != nil {
return err
}
u.Path += "exec"
params := url.Values{}
......@@ -408,21 +470,22 @@ func createTable(ipAddr, tableSql string) {
url := fmt.Sprintf("%v", u)
res, err := http.Get(url)
checkErr(err)
if err != nil {
return err
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
checkErr(err)
slog.Info("createTable", "response body", body)
}
func checkErr(err error) {
if err != nil {
panic(err)
return err
}
slog.Info("createTable", "response body", body)
return nil
}
var taskJsonStr = `{"id":10,"desc":"测试新增","price":1000,"complexity":1,"hardware_require":"{ disk_size: 100, memory_size: 100 }","image_id":"13221312","image_url":"dasdasdasd","cmd":"{\"image_name\":\"demianhjw/aigic:0129\",\"docker_cmd\":{\"container_port\":\"5001\"},\"api_url\":\"http://127.0.0.1:%d/aigic\"}","workload":100,"api_path":"/demianhjw/aigic/0129","image_name":"demianhjw/aigic:0129","sign_url":"dsdsdsds","username":"sdsds","password":"sdsdsd","created_time":"2024-02-02T03:13:33+08:00","updated_time":"2024-02-02T03:13:33+08:00","deleted":0}`
......@@ -513,8 +576,6 @@ func main() {
wg.Add(1)
// Launch the Kafka producer goroutine in the background.
//go kafkaProducer()
go kafkaProducerBytes()
done := make(chan interface{})
......@@ -522,7 +583,6 @@ func main() {
reqToQuestDb, resToQuestDb := kafkaConsumerBytes(done, aigcProduceTopic, aigcConsumerTopic)
go batchToQuestDb(done, reqToQuestDb, resToQuestDb, "tasks", "bills", questAddr)
go recordUUID()
// Create a new instance of the Fiber web framework.
......@@ -553,8 +613,6 @@ func main() {
slog.Debug("callback", "path", c.Route().Path)
//fmt.Println("c.Route().Path", c.Route().Path)
body := c.Body()
var resbody pbUpstream.TaskResponse
......@@ -578,8 +636,9 @@ func main() {
replanceQueryTask = &task
panic(app.Listen(listenIpPort))
if err := app.Listen(listenIpPort); err != nil {
slog.Error("app.Listen", "err", err.Error())
}
}
var replanceQueryTask *model.TaskType
......@@ -618,7 +677,7 @@ func ApiAndJWT(c *fiber.Ctx) error {
}
if len(uid) == 0 {
return c.SendString(fmt.Sprintf("uid can not be 0"))
return c.SendString(fmt.Sprintf("len(uid) can not be 0"))
}
uidAsInt, err := strconv.Atoi(uid[0])
......@@ -765,93 +824,3 @@ func syncModeF(c *fiber.Ctx, taskid string) error {
return c.Send(resAsPb.GetTaskResultBody())
}
/////////////////////////////
//
//
////////////////////////////
// func kafkaProducer() {
// // Create a new Sarama configuration for the Kafka producer.
// config := sarama.NewConfig()
// // Create a new Kafka producer using the specified configuration and broker addresses.
// producer, err := sarama.NewAsyncProducer(kafkaBrokers, config)
// if err != nil {
// panic(fmt.Sprint("Failed to start Kafka producer:", err))
// }
// // Ensure the Kafka producer is closed when the function ends (deferred execution).
// defer producer.Close()
// for message := range producerMessages {
// counterStr := fmt.Sprintf("%d", counter)
// // Get the Indian Standard Time (IST) location
// istLocation, err := time.LoadLocation("Asia/Kolkata")
// if err != nil {
// slog.Error("message := range producerMessages", "Failed to load IST location:", err)
// continue
// }
// // Convert current time to IST
// istTime := time.Now().In(istLocation).Format("02-01-2006 15:04:05")
// value := fmt.Sprintf("(%s, %s, %s)", counterStr, istTime, message)
// producer.Input() <- &sarama.ProducerMessage{
// Topic: topic,
// Value: sarama.StringEncoder(value),
// }
// slog.Debug("send to kafka", "msg", string(value))
// counter++
// // Introduce random delay between 1 to 3 seconds for message push
// //time.Sleep(time.Duration(1+rand.Intn(3)) * time.Second)
// }
// }
// func kafkaConsumer(wg *sync.WaitGroup) {
// // Create a new Sarama configuration for the Kafka producer.
// config := sarama.NewConfig()
// // Create a new Kafka consumer using the specified configuration and broker addresses.
// consumer, err := sarama.NewConsumer(kafkaBrokers, config)
// if err != nil {
// log.Fatal("Failed to start Kafka consumer:", err)
// }
// // Ensure the Kafka consumer is closed when the function ends (deferred execution).
// defer consumer.Close()
// // Create a partition consumer for the specified topic, partition, and starting offset.
// // The starting offset is set to sarama.OffsetNewest, which means the consumer will start consuming messages from the latest available offset.
// partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest)
// if err != nil {
// log.Fatal("Failed to start partition consumer:", err)
// }
// // Ensure the partition consumer is closed when the function ends (deferred execution).
// defer partitionConsumer.Close()
// // Signal that the consumer goroutine is ready
// defer wg.Done()
// // Infinite loop to continuously listen for messages from the partitionConsumer.Messages() channel.
// for {
// select {
// case message := <-partitionConsumer.Messages():
// value := string(message.Value)
// fmt.Printf("Received message from Kafka: %s\n", value)
// // Acquire the mutex before appending to the messages slice to ensure concurrent-safe access.
// mutex.Lock()
// // Append the received message to the internal messages slice.
// messages = append(messages, value)
// // Release the mutex.
// mutex.Unlock()
// // Send the received message to the /consumer endpoint via the consumerMessages channel.
// consumerMessages <- value
// }
// }
// }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment