Commit 288aaa27 authored by Your Name's avatar Your Name

add questDB

parent 2889a103
......@@ -11,7 +11,6 @@ require (
github.com/gofiber/fiber/v2 v2.52.0
github.com/gogo/protobuf v1.3.2
github.com/odysseus/odysseus-protocol v0.0.0-00010101000000-000000000000
github.com/odysseus/payment v0.0.0-00010101000000-000000000000
)
require (
......@@ -59,14 +58,16 @@ require (
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/odysseus/payment v0.0.0-00010101000000-000000000000 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/questdb/go-questdb-client/v2 v2.0.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/redis/go-redis/v9 v9.4.0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/sirupsen/logrus v1.4.2 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/swaggo/swag v1.16.2 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.51.0 // indirect
......@@ -76,7 +77,7 @@ require (
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/tools v0.7.0 // indirect
golang.org/x/tools v0.14.0 // indirect
google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
......
......@@ -19,6 +19,8 @@ github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHG
github.com/asaskevich/govalidator v0.0.0-20200907205600-7a23bdc65eef/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw=
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 h1:DklsrG3dyBCFEj5IhUbnKptjxatkF07cF2ak3yi77so=
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw=
github.com/astaxie/beego v1.10.1 h1:M2ciUnyiZycuTpGEA+idJF0gX24h58EbPvGqjnO/DCg=
github.com/astaxie/beego v1.10.1/go.mod h1:0R4++1tUqERR0WYFWdfkcrsyoVBCG4DgpDGokT3yb+U=
github.com/astaxie/beego v1.12.3 h1:SAQkdD2ePye+v8Gn1r4X6IKZM1wd28EyUOVQ3PDSOOQ=
github.com/astaxie/beego v1.12.3/go.mod h1:p3qIm0Ryx7zeBHLljmd7omloyca1s4yu1a8kM1FkpIA=
github.com/beego/goyaml2 v0.0.0-20130207012346-5545475820dd/go.mod h1:1b+Y/CofkYwXMUU0OhQqGvsY2Bvgr4j6jfT699wyZKQ=
......@@ -151,6 +153,8 @@ github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8l
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gomodule/redigo v1.8.9 h1:Sl3u+2BI/kk+VEatbj0scLdrFhjPmbxOc1myhDP41ws=
github.com/gomodule/redigo v1.8.9/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE=
github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0=
github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
......@@ -269,6 +273,8 @@ github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB8
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/questdb/go-questdb-client/v2 v2.0.0 h1:5N4K/siiiuwZ8Lzx+3y+mrKMX5G+8D1FuVXUtbZmgnk=
github.com/questdb/go-questdb-client/v2 v2.0.0/go.mod h1:7E8ymLWyraJprj77cN8VNJq0w4GABHtT8WxMYAOqLEM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/redis/go-redis/v9 v9.4.0 h1:Yzoz33UZw9I/mFhx4MNrB6Fk+XHO1VukNcCa1+lwyKk=
......@@ -289,6 +295,8 @@ github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPx
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/ssdb/gossdb v0.0.0-20180723034631-88f6b59b84ec/go.mod h1:QBvMkMya+gXctz3kmljlUCu/yB3GZ6oee+dUozsezQE=
......@@ -395,6 +403,7 @@ golang.org/x/sys v0.0.0-20210420072515-93ed5bcd2bfe/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
......@@ -423,6 +432,7 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.7.0 h1:W4OVu8VVOaIO0yzWMNdepAulS7YfoS3Zabrm8DOXXU4=
golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s=
golang.org/x/tools v0.14.0/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
......
......@@ -14,14 +14,9 @@ import (
"github.com/IBM/sarama"
"github.com/gofiber/fiber/v2"
// "github.com/gogo/protobuf/proto"
// "github.com/gogo/protobuf/types"
gogoPbProto "github.com/gogo/protobuf/proto"
//pbtypes "github.com/gogo/protobuf/types"
pbUpstream "github.com/odysseus/odysseus-protocol/gen/proto/go/base/v1"
//omanager "github.com/odysseus/odysseus-protocol/gen/proto/go/nodemanager/v1"
qdb "github.com/questdb/go-questdb-client/v2"
"github.com/odysseus/payment/cachedata"
"github.com/odysseus/payment/model"
......@@ -78,22 +73,12 @@ func kafkaProducerBytes() {
for message := range producerMessagesBytes {
header := make([]sarama.RecordHeader, 0, 1)
for k, v := range message.HttpHeader {
header = append(header, sarama.RecordHeader{
Key: []byte(k),
Value: []byte(v),
})
}
v := append(sarama.ByteEncoder{}, message.Bytes...)
//sarama.ByteEncoder =
producer.Input() <- &sarama.ProducerMessage{
Topic: aigcTopic,
Value: v,
Headers: header,
}
// Introduce random delay between 1 to 3 seconds for message push
......@@ -101,47 +86,13 @@ func kafkaProducerBytes() {
}
}
func kafkaProducer() {
// Create a new Sarama configuration for the Kafka producer.
config := sarama.NewConfig()
// Create a new Kafka producer using the specified configuration and broker addresses.
producer, err := sarama.NewAsyncProducer(kafkaBrokers, config)
if err != nil {
log.Fatal("Failed to start Kafka producer:", err)
}
// Ensure the Kafka producer is closed when the function ends (deferred execution).
defer producer.Close()
for message := range producerMessages {
counterStr := fmt.Sprintf("%d", counter)
// Get the Indian Standard Time (IST) location
istLocation, err := time.LoadLocation("Asia/Kolkata")
if err != nil {
log.Fatal("Failed to load IST location:", err)
}
// Convert current time to IST
istTime := time.Now().In(istLocation).Format("02-01-2006 15:04:05")
value := fmt.Sprintf("(%s, %s, %s)", counterStr, istTime, message)
producer.Input() <- &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(value),
}
func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUpstream.TaskContent, chan pbUpstream.TaskReceipt) {
fmt.Printf("Message sent: %s\n", value)
counter++
reqOutStream := make(chan pbUpstream.TaskContent, 1000)
// Introduce random delay between 1 to 3 seconds for message push
time.Sleep(time.Duration(1+rand.Intn(3)) * time.Second)
}
}
func kafkaConsumerBytes(wg *sync.WaitGroup) {
resOutStream := make(chan pbUpstream.TaskReceipt, 1000)
go func() {
// Create a new Sarama configuration for the Kafka producer.
config := sarama.NewConfig()
......@@ -156,116 +107,158 @@ func kafkaConsumerBytes(wg *sync.WaitGroup) {
// Create a partition consumer for the specified topic, partition, and starting offset.
// The starting offset is set to sarama.OffsetNewest, which means the consumer will start consuming messages from the latest available offset.
partitionConsumer, err := consumer.ConsumePartition(aigcTopic, 0, sarama.OffsetNewest)
reqConsumer, err := consumer.ConsumePartition(req, 0, sarama.OffsetNewest)
if err != nil {
log.Fatal("Failed to start partition consumer:", err)
}
// Ensure the partition consumer is closed when the function ends (deferred execution).
defer partitionConsumer.Close()
defer reqConsumer.Close()
resConsumer, err := consumer.ConsumePartition(resTopic, 0, sarama.OffsetNewest)
if err != nil {
log.Fatal("Failed to start partition consumer:", err)
}
// Ensure the partition consumer is closed when the function ends (deferred execution).
defer resConsumer.Close()
// Signal that the consumer goroutine is ready
wg.Done()
//wg.Done()
// Infinite loop to continuously listen for messages from the partitionConsumer.Messages() channel.
for {
for {
select {
case message := <-partitionConsumer.Messages():
case message := <-reqConsumer.Messages():
jsonMsg, err := json.Marshal(message)
if message == nil {
fmt.Println("reqConsumer message is nil ")
continue
}
if err != nil {
fmt.Println("consumer error", err.Error())
if message.Value == nil {
fmt.Println("reqConsumer message.Value is nil ")
continue
}
//baseapi.
pbMsg := pbUpstream.TaskContent{}
value := string(jsonMsg)
fmt.Printf("Received message from Kafka: %s\n", value)
// Acquire the mutex before appending to the messages slice to ensure concurrent-safe access.
mutex.Lock()
// Append the received message to the internal messages slice.
messages = append(messages, value)
// Release the mutex.
mutex.Unlock()
// Send the received message to the /consumer endpoint via the consumerMessages channel.
consumerMessages <- value
if err := gogoPbProto.Unmarshal(message.Value, &pbMsg); err != nil {
fmt.Println("kafkaConsumerBytes gogoPbProto.Unmarshal", err.Error())
continue
}
fmt.Println("reqConsumer msg", pbMsg.TaskUuid)
select {
case <-done:
return
case reqOutStream <- pbMsg:
}
}
func kafkaConsumer(wg *sync.WaitGroup) {
case message := <-resConsumer.Messages():
// Create a new Sarama configuration for the Kafka producer.
config := sarama.NewConfig()
if message == nil {
fmt.Println("resConsumer message is nil ")
continue
}
// Create a new Kafka consumer using the specified configuration and broker addresses.
consumer, err := sarama.NewConsumer(kafkaBrokers, config)
if err != nil {
log.Fatal("Failed to start Kafka consumer:", err)
if message.Value == nil {
fmt.Println("resConsumer message.Value is nil ")
continue
}
pbMsg := pbUpstream.TaskReceipt{}
// Ensure the Kafka consumer is closed when the function ends (deferred execution).
defer consumer.Close()
if err := gogoPbProto.Unmarshal(message.Value, &pbMsg); err != nil {
fmt.Println("kafkaConsumerBytes gogoPbProto.Unmarshal", err.Error())
continue
}
// Create a partition consumer for the specified topic, partition, and starting offset.
// The starting offset is set to sarama.OffsetNewest, which means the consumer will start consuming messages from the latest available offset.
partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest)
if err != nil {
log.Fatal("Failed to start partition consumer:", err)
fmt.Println("resConsumer msg", pbMsg.TaskUuid, "len(resOutStream)", len(resOutStream))
select {
case <-done:
return
case resOutStream <- pbMsg:
}
// Ensure the partition consumer is closed when the function ends (deferred execution).
defer partitionConsumer.Close()
case <-done:
return
}
}
}()
// Signal that the consumer goroutine is ready
defer wg.Done()
return reqOutStream, resOutStream
}
func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent, resStream chan pbUpstream.TaskReceipt, reqTableName string, resTableName, questAddr string) {
//tableName := "tasks"
ctx := context.TODO()
// Connect to QuestDB running on 127.0.0.1:9009
//addrOpt := qdb.WithAddress("192.168.1.220:9009")
addrOpt := qdb.WithAddress(questAddr)
sender, err := qdb.NewLineSender(ctx, addrOpt)
if err != nil {
log.Fatal(err)
}
// Make sure to close the sender on exit to release resources.
defer sender.Close()
// Infinite loop to continuously listen for messages from the partitionConsumer.Messages() channel.
for {
select {
case message := <-partitionConsumer.Messages():
value := string(message.Value)
fmt.Printf("Received message from Kafka: %s\n", value)
// Acquire the mutex before appending to the messages slice to ensure concurrent-safe access.
mutex.Lock()
// Append the received message to the internal messages slice.
messages = append(messages, value)
// Release the mutex.
mutex.Unlock()
// Send the received message to the /consumer endpoint via the consumerMessages channel.
consumerMessages <- value
}
case task := <-resStream:
sender.Table(resTableName).
Symbol("type", fmt.Sprintf("%d", task.TaskId)).
Symbol("uid", fmt.Sprintf("%d", task.TaskUid)).
Symbol("profit_acc", task.TaskProfitAccount).
Symbol("worker_acc", task.TaskWorkerAccount).
Symbol("result", task.TaskResult).
StringColumn("id", task.TaskUuid).
TimestampColumn("time", time.UnixMilli(int64(task.TaskFinishTime))).
Int64Column("fee", task.TaskFee).
Int64Column("workload", int64(task.TaskWorkload)).
Int64Column("out_len", int64(task.TaskOutLen)).
Int64Column("duration", int64(task.TaskDuration)).
AtNow(ctx)
err = sender.Flush(ctx)
if err != nil {
log.Fatal(err)
}
}
// RoundStepType enumerates the state of the consensus state machine
type RoundStepType uint8 // These must be numeric, ordered.
case task := <-reqStream:
taskFeeAsInt, err := strconv.Atoi(task.TaskFee)
if err != nil {
fmt.Printf("task.TaskFee string to int error: %v\n", err)
continue
}
// RoundStepType
const (
ChatCompletionsType pbUpstream.TaskType = 1
ImagesGenerationsType
sender.Table(reqTableName).
Symbol("type", fmt.Sprintf("%d", task.TaskType)).
Symbol("uid", fmt.Sprintf("%d", task.TaskUid)).
StringColumn("id", task.TaskUuid).
TimestampColumn("time", time.UnixMilli(int64(task.TaskTimestamp))).
Int64Column("fee", int64(taskFeeAsInt)).
Int64Column("in_len", int64(task.TaskInLen)).
AtNow(ctx)
// ImagesVariations = "ImagesVariations"
// ImagesEdits = "ImagesEdits"
)
err = sender.Flush(ctx)
if err != nil {
log.Fatal(err)
}
const (
ChatCompletionsFeeStr = "10"
ImagesGenerationsFeeStr = "20"
ChatCompletionsFee = 10
ImagesGenerationsFee = 20
)
case <-done:
return
//POST
//https://api.openai.com/v1/chat/completions
}
// PrevoteType SignedMsgType = 0x01
// PrecommitType SignedMsgType = 0x02
// ProposalType SignedMsgType = 0x20
}
}
type ResponseJson struct {
TaskUUID string
......@@ -306,10 +299,11 @@ func main() {
go kafkaProducerBytes()
// Launch the Kafka consumer goroutine in the background, passing the WaitGroup for synchronization.
//go kafkaConsumer(wg)
done := make(chan interface{})
//go kafkaConsumerBytes(wg)
reqToQuestDb, resToQuestDb := kafkaConsumerBytes(done, aigcTopic, "taskreceipt")
go batchToQuestDb(done, reqToQuestDb, resToQuestDb, "tasks", "bills", "192.168.1.220:9009")
go recordUUID()
......@@ -422,3 +416,92 @@ func main() {
log.Fatal(app.Listen("0.0.0.0:6000"))
}
/////////////////////////////
//
//
////////////////////////////
func kafkaProducer() {
// Create a new Sarama configuration for the Kafka producer.
config := sarama.NewConfig()
// Create a new Kafka producer using the specified configuration and broker addresses.
producer, err := sarama.NewAsyncProducer(kafkaBrokers, config)
if err != nil {
log.Fatal("Failed to start Kafka producer:", err)
}
// Ensure the Kafka producer is closed when the function ends (deferred execution).
defer producer.Close()
for message := range producerMessages {
counterStr := fmt.Sprintf("%d", counter)
// Get the Indian Standard Time (IST) location
istLocation, err := time.LoadLocation("Asia/Kolkata")
if err != nil {
log.Fatal("Failed to load IST location:", err)
}
// Convert current time to IST
istTime := time.Now().In(istLocation).Format("02-01-2006 15:04:05")
value := fmt.Sprintf("(%s, %s, %s)", counterStr, istTime, message)
producer.Input() <- &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(value),
}
fmt.Printf("Message sent: %s\n", value)
counter++
// Introduce random delay between 1 to 3 seconds for message push
time.Sleep(time.Duration(1+rand.Intn(3)) * time.Second)
}
}
func kafkaConsumer(wg *sync.WaitGroup) {
// Create a new Sarama configuration for the Kafka producer.
config := sarama.NewConfig()
// Create a new Kafka consumer using the specified configuration and broker addresses.
consumer, err := sarama.NewConsumer(kafkaBrokers, config)
if err != nil {
log.Fatal("Failed to start Kafka consumer:", err)
}
// Ensure the Kafka consumer is closed when the function ends (deferred execution).
defer consumer.Close()
// Create a partition consumer for the specified topic, partition, and starting offset.
// The starting offset is set to sarama.OffsetNewest, which means the consumer will start consuming messages from the latest available offset.
partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest)
if err != nil {
log.Fatal("Failed to start partition consumer:", err)
}
// Ensure the partition consumer is closed when the function ends (deferred execution).
defer partitionConsumer.Close()
// Signal that the consumer goroutine is ready
defer wg.Done()
// Infinite loop to continuously listen for messages from the partitionConsumer.Messages() channel.
for {
select {
case message := <-partitionConsumer.Messages():
value := string(message.Value)
fmt.Printf("Received message from Kafka: %s\n", value)
// Acquire the mutex before appending to the messages slice to ensure concurrent-safe access.
mutex.Lock()
// Append the received message to the internal messages slice.
messages = append(messages, value)
// Release the mutex.
mutex.Unlock()
// Send the received message to the /consumer endpoint via the consumerMessages channel.
consumerMessages <- value
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment