Commit 69353925 authored by Your Name's avatar Your Name

add proof table

parent 51392b31
...@@ -115,10 +115,11 @@ func kafkaProducerBytes() { ...@@ -115,10 +115,11 @@ func kafkaProducerBytes() {
} }
func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUpstream.TaskContent, chan pbUpstream.TaskReceipt) { func kafkaConsumerBytes(done chan interface{}, req, resTopic, proofTopic string) (chan pbUpstream.TaskContent, chan pbUpstream.TaskReceipt, chan pbUpstream.TaskProof) {
reqOutStream := make(chan pbUpstream.TaskContent, 1000) reqOutStream := make(chan pbUpstream.TaskContent, 1000)
resOutStream := make(chan pbUpstream.TaskReceipt, 1000) resOutStream := make(chan pbUpstream.TaskReceipt, 1000)
proofOutStream := make(chan pbUpstream.TaskProof, 1000)
go func() { go func() {
...@@ -315,10 +316,120 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUps ...@@ -315,10 +316,120 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUps
} }
}() }()
return reqOutStream, resOutStream go func() {
// Create a new Sarama configuration for the Kafka producer.
config := sarama.NewConfig()
var consumer sarama.Consumer
var err error
for {
// Create a new Kafka consumer using the specified configuration and broker addresses.
consumer, err = sarama.NewConsumer(kafkaBrokers, config)
if err != nil {
slog.LogAttrs(context.Background(), slog.LevelError, "kafkaConsumerBytes", append([]slog.Attr{}, slog.String("NewConsumer", err.Error()))...)
time.Sleep(time.Second * 3)
} else {
break
}
}
// Ensure the Kafka consumer is closed when the function ends (deferred execution).
defer consumer.Close()
// Create a partition consumer for the specified topic, partition, and starting offset.
// // The starting offset is set to sarama.OffsetNewest, which means the consumer will start consuming messages from the latest available offset.
// reqConsumer, err := consumer.ConsumePartition(req, 0, sarama.OffsetNewest)
// if err != nil {
// log.Fatal("Failed to start partition consumer:", err)
// }
// // Ensure the partition consumer is closed when the function ends (deferred execution).
// defer reqConsumer.Close()
// resConsumer, err := consumer.ConsumePartition(resTopic, 0, sarama.OffsetNewest)
// if err != nil {
// panic(fmt.Sprint("Failed to start partition consumer:", err))
// }
var resConsumer sarama.PartitionConsumer
for {
// Create a partition consumer for the specified topic, partition, and starting offset.
// The starting offset is set to sarama.OffsetNewest, which means the consumer will start consuming messages from the latest available offset.
resConsumer, err = consumer.ConsumePartition(proofTopic, 0, sarama.OffsetNewest)
if err != nil {
slog.LogAttrs(context.Background(), slog.LevelError, "kafkaConsumerBytes", append([]slog.Attr{}, slog.String("ConsumePartition", err.Error()), slog.String("topic", proofTopic))...)
//slog.Error("consumer.ConsumePartition", "err", err.Error())
time.Sleep(time.Second * 3)
} else {
break
}
}
// Ensure the partition consumer is closed when the function ends (deferred execution).
defer resConsumer.Close()
// Signal that the consumer goroutine is ready
//wg.Done()
// Infinite loop to continuously listen for messages from the partitionConsumer.Messages() channel.
for {
select {
case message := <-resConsumer.Messages():
if message == nil {
slog.LogAttrs(context.Background(), slog.LevelError, "kafkaConsumerBytes", append([]slog.Attr{}, slog.String("message", err.Error()), slog.String("topic", proofTopic))...)
//slog.Error("kafka consumer", "topic", resTopic, "message", "is nil")
continue
}
if message.Value == nil {
slog.LogAttrs(context.Background(), slog.LevelError, "kafkaConsumerBytes", append([]slog.Attr{}, slog.String("value", err.Error()), slog.String("topic", proofTopic))...)
//slog.Error("kafka consumer", "topic", resTopic, "message vaule", "is nil")
continue
}
pbMsg := pbUpstream.TaskProof{}
if err := gogoPbProto.Unmarshal(message.Value, &pbMsg); err != nil {
slog.LogAttrs(context.Background(), slog.LevelError, "kafkaConsumerBytes", append([]slog.Attr{}, slog.String("pbUnmarshal", err.Error()), slog.String("topic", proofTopic))...)
//slog.Error("kafka consumer", "topic", resTopic, "gogoPbProto.Unmarshal err", err.Error())
continue
}
baseAttributes := []slog.Attr{}
baseAttributes = append(baseAttributes, slog.String(TaskIdAtrr, pbMsg.TaskId))
slog.LogAttrs(context.Background(), slog.LevelInfo, "<- kafka consumer", append(baseAttributes, slog.String("topic", proofTopic))...)
//slog.Info("kafka consumer", "topic", resTopic, "pbMsg.TaskId", pbMsg.TaskId)
select {
case <-done:
return
case proofOutStream <- pbMsg:
}
case <-done:
return
}
}
}()
return reqOutStream, resOutStream, proofOutStream
} }
func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent, resStream chan pbUpstream.TaskReceipt, reqTableName string, resTableName, questAddr string) { func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent, resStream chan pbUpstream.TaskReceipt, proofStream chan pbUpstream.TaskProof, reqTableName string, resTableName, proofTableName string, questAddr string) {
ctx := context.TODO() ctx := context.TODO()
addrOpt := qdb.WithAddress(questAddr) addrOpt := qdb.WithAddress(questAddr)
...@@ -423,6 +534,54 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent ...@@ -423,6 +534,54 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
} }
} }
case proof := <-proofStream:
//proof.TaskContainerSignature
// TaskId string
// TaskFinishTimestamp uint64
// TaskType uint64
// TaskWorkload uint64
// TaskReqHash string
// TaskRespHash string
// TaskManagerSignature string
// TaskContainerSignature string
// TaskMinerSignature string
// TaskProfitAccount string
// TaskWorkerAccount string
nanoseconds := int64(proof.TaskFinishTimestamp)
seconds := nanoseconds / 1e9
for {
sender.
Table(proofTableName).
Symbol("TaskType", fmt.Sprintf("%d", proof.TaskType)).
StringColumn("TaskId", proof.TaskId).
TimestampColumn("TaskFinishTimestamp", time.Unix(seconds, nanoseconds%1e9)).
Int64Column("TaskWorkload", int64(proof.TaskWorkload)).
StringColumn("TaskReqHash", proof.TaskReqHash).
StringColumn("TaskRespHash", proof.TaskRespHash).
StringColumn("TaskManagerSignature", proof.TaskManagerSignature).
StringColumn("TaskContainerSignature", proof.TaskContainerSignature).
StringColumn("TaskMinerSignature", proof.TaskMinerSignature).
StringColumn("TaskProfitAccount", proof.TaskProfitAccount).
StringColumn("TaskWorkerAccount", proof.TaskWorkerAccount).
AtNow(ctx)
err = sender.Flush(ctx)
if err != nil {
slog.LogAttrs(context.Background(), slog.LevelError, "batchToQuestDb", append([]slog.Attr{}, slog.String("sender.Flush", err.Error()))...)
//slog.Error("task := <-reqStream", "error", err.Error())
time.Sleep(time.Second * 3)
} else {
break
}
}
case <-done: case <-done:
return return
} }
...@@ -522,7 +681,7 @@ var withBillDb bool ...@@ -522,7 +681,7 @@ var withBillDb bool
func main() { func main() {
var questAddr, kafkaBroker, callbackAddrP, listenIpPort, aigcProduceTopic, aigcConsumerTopic, logserver string var questAddr, kafkaBroker, callbackAddrP, listenIpPort, aigcProduceTopic, aigcTaskreceiptTopic, aigcTaskProof, logserver string
var redisAddr, redisPassWd, mysqlAddr string var redisAddr, redisPassWd, mysqlAddr string
var mysqlPort int var mysqlPort int
...@@ -550,7 +709,9 @@ func main() { ...@@ -550,7 +709,9 @@ func main() {
flag.StringVar(&callbackAddrP, "callbackIpAddr", "localhost:6001", "ip:port") flag.StringVar(&callbackAddrP, "callbackIpAddr", "localhost:6001", "ip:port")
flag.StringVar(&listenIpPort, "listenIpPort", "0.0.0.0:6001", "api listen on ip:port") flag.StringVar(&listenIpPort, "listenIpPort", "0.0.0.0:6001", "api listen on ip:port")
flag.StringVar(&aigcProduceTopic, "aigcProduceTopic", "pbaigc", "produce topic, default value is: pbaigc") flag.StringVar(&aigcProduceTopic, "aigcProduceTopic", "pbaigc", "produce topic, default value is: pbaigc")
flag.StringVar(&aigcConsumerTopic, "aigcConsumerTopic", "taskreceipt", "consumer topic, default value is: taskreceipt") flag.StringVar(&aigcTaskreceiptTopic, "aigcTaskreceiptTopic", "taskreceipt", "consumer topic, default value is: taskreceipt")
flag.StringVar(&aigcTaskProof, "aigcTaskProof", "taskproof", "TaskProof topic, default value is: aigcTaskProof")
flag.BoolVar(&withBillDb, "withbilldb", true, "enable with bill db, or simulate") flag.BoolVar(&withBillDb, "withbilldb", true, "enable with bill db, or simulate")
...@@ -586,7 +747,8 @@ func main() { ...@@ -586,7 +747,8 @@ func main() {
kafkaAttributes := []slog.Attr{ kafkaAttributes := []slog.Attr{
slog.String("addr", kafkaBroker), slog.String("addr", kafkaBroker),
slog.String("task-topic", aigcProduceTopic), slog.String("task-topic", aigcProduceTopic),
slog.String("bill-topic", aigcConsumerTopic), slog.String("bill-topic", aigcTaskreceiptTopic),
slog.String("proof-topic", aigcTaskProof),
} }
mysqlAttributes := []slog.Attr{ mysqlAttributes := []slog.Attr{
...@@ -682,9 +844,9 @@ func main() { ...@@ -682,9 +844,9 @@ func main() {
done := make(chan interface{}) done := make(chan interface{})
reqToQuestDb, resToQuestDb := kafkaConsumerBytes(done, aigcProduceTopic, aigcConsumerTopic) reqToQuestDb, resToQuestDb, proofToQuestDb := kafkaConsumerBytes(done, aigcProduceTopic, aigcTaskreceiptTopic, aigcTaskProof)
go batchToQuestDb(done, reqToQuestDb, resToQuestDb, "tasks", "bills", questAddr) go batchToQuestDb(done, reqToQuestDb, resToQuestDb, proofToQuestDb, "tasks", "bills", "proof", questAddr)
go recordUUID() go recordUUID()
// Create a new instance of the Fiber web framework. // Create a new instance of the Fiber web framework.
...@@ -714,10 +876,9 @@ func main() { ...@@ -714,10 +876,9 @@ func main() {
callbackGroup := app.Group("/callback") callbackGroup := app.Group("/callback")
apiGroupV1 := apiGroup.Group("/v1") apiGroupV1 := apiGroup.Group("/v1")
apiGroupV1.Post("/*", ApiOrJWT) apiGroupV1.Post("/*", slogfiber.NewWithConfig(slog.Default(), cfg), ApiOrJWT)
jwtGroupV1 := jwtGroup.Group("/v1") jwtGroupV1 := jwtGroup.Group("/v1")
jwtGroupV1.Post("/*", ApiOrJWT) jwtGroupV1.Post("/*", ApiOrJWT)
callbackGroupV1 := callbackGroup.Group("/v1") callbackGroupV1 := callbackGroup.Group("/v1")
...@@ -760,6 +921,19 @@ var replanceQueryTask *model.TaskType ...@@ -760,6 +921,19 @@ var replanceQueryTask *model.TaskType
func ApiOrJWT(c *fiber.Ctx) error { func ApiOrJWT(c *fiber.Ctx) error {
cfg := slogfiber.Config{
WithUserAgent: true,
WithRequestID: true,
WithRequestBody: true,
WithRequestHeader: true,
WithResponseBody: false,
WithResponseHeader: true,
WithSpanID: true,
WithTraceID: true,
}
slogfiber.NewWithConfig(slog.Default(), cfg)
reqHeaders := c.GetReqHeaders() reqHeaders := c.GetReqHeaders()
slogfiber.AddCustomAttributes(c, slog.String(TaskIdAtrr, reqHeaders[TaskIdAtrr][0])) slogfiber.AddCustomAttributes(c, slog.String(TaskIdAtrr, reqHeaders[TaskIdAtrr][0]))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment