Commit 19b8e4e1 authored by Your Name's avatar Your Name

update field

parent 1b3fb251
...@@ -30,7 +30,7 @@ func req(uuid string) chan pbUpstream.TaskResponse { ...@@ -30,7 +30,7 @@ func req(uuid string) chan pbUpstream.TaskResponse {
func res(res pbUpstream.TaskResponse) { func res(res pbUpstream.TaskResponse) {
if v, ok := recordmap.LoadAndDelete(res.TaskUuid); ok { if v, ok := recordmap.LoadAndDelete(res.TaskId); ok {
resAsV, ok := v.(chan pbUpstream.TaskResponse) resAsV, ok := v.(chan pbUpstream.TaskResponse)
if ok { if ok {
resAsV <- res resAsV <- res
......
...@@ -60,7 +60,7 @@ type bytesAndHeader struct { ...@@ -60,7 +60,7 @@ type bytesAndHeader struct {
var producerMessagesBytes = make(chan bytesAndHeader, 1000) var producerMessagesBytes = make(chan bytesAndHeader, 1000)
//var TaskUUIDInstream = make(chan string, 1000) //var TaskIdInstream = make(chan string, 1000)
// Channel to send messages from the Kafka consumer to the /consumer endpoint // Channel to send messages from the Kafka consumer to the /consumer endpoint
var consumerMessages = make(chan string) var consumerMessages = make(chan string)
...@@ -157,7 +157,7 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUps ...@@ -157,7 +157,7 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUps
continue continue
} }
fmt.Println("reqConsumer msg", pbMsg.TaskUuid) fmt.Println("reqConsumer msg", pbMsg.TaskId)
select { select {
case <-done: case <-done:
...@@ -183,7 +183,7 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUps ...@@ -183,7 +183,7 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUps
continue continue
} }
fmt.Println("resConsumer msg", pbMsg.TaskUuid, "len(resOutStream)", len(resOutStream)) fmt.Println("resConsumer msg", pbMsg.TaskId, "len(resOutStream)", len(resOutStream))
select { select {
case <-done: case <-done:
...@@ -229,7 +229,7 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent ...@@ -229,7 +229,7 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
Symbol("profit_acc", task.TaskProfitAccount). Symbol("profit_acc", task.TaskProfitAccount).
Symbol("worker_acc", task.TaskWorkerAccount). Symbol("worker_acc", task.TaskWorkerAccount).
Symbol("result", task.TaskResult). Symbol("result", task.TaskResult).
StringColumn("id", task.TaskUuid). StringColumn("id", task.TaskId).
TimestampColumn("time", time.Unix(seconds, nanoseconds%1e9)). TimestampColumn("time", time.Unix(seconds, nanoseconds%1e9)).
Int64Column("fee", task.TaskFee). Int64Column("fee", task.TaskFee).
Int64Column("workload", int64(task.TaskWorkload)). Int64Column("workload", int64(task.TaskWorkload)).
...@@ -274,7 +274,7 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent ...@@ -274,7 +274,7 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
Table(reqTableName). Table(reqTableName).
Symbol("type", fmt.Sprintf("%d", task.TaskType)). Symbol("type", fmt.Sprintf("%d", task.TaskType)).
Symbol("uid", task.TaskUid). Symbol("uid", task.TaskUid).
StringColumn("id", task.TaskUuid). StringColumn("id", task.TaskId).
TimestampColumn("time", time.Unix(seconds, nanoseconds%1e9)). TimestampColumn("time", time.Unix(seconds, nanoseconds%1e9)).
Int64Column("fee", int64(taskFeeAsInt)). Int64Column("fee", int64(taskFeeAsInt)).
Int64Column("in_len", int64(task.TaskInLen)). Int64Column("in_len", int64(task.TaskInLen)).
...@@ -294,7 +294,7 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent ...@@ -294,7 +294,7 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
} }
type ResponseJson struct { type ResponseJson struct {
TaskUUID string TaskId string
TaskResult []byte TaskResult []byte
TaskUid string TaskUid string
TaskFee string TaskFee string
...@@ -548,9 +548,9 @@ func ApiAndJWT(c *fiber.Ctx) error { ...@@ -548,9 +548,9 @@ func ApiAndJWT(c *fiber.Ctx) error {
cmd = task.Cmd cmd = task.Cmd
pbMsg := pbUpstream.TaskContent{ pbMsg := pbUpstream.TaskContent{
TaskUuid: reqHeaders["Task-Id"][0], TaskId: reqHeaders["Task-Id"][0],
TaskId: uint64(task.ID), TaskType: uint64(task.ID),
TaskType: 1, TaskKind: 1,
TaskCmd: cmd, TaskCmd: cmd,
TaskParam: c.Body(), //[]byte(reqHeaders["Task-Id"][0]), TaskParam: c.Body(), //[]byte(reqHeaders["Task-Id"][0]),
TaskTimestamp: uint64(time.Now().UnixNano()), TaskTimestamp: uint64(time.Now().UnixNano()),
...@@ -582,9 +582,9 @@ func ApiAndJWT(c *fiber.Ctx) error { ...@@ -582,9 +582,9 @@ func ApiAndJWT(c *fiber.Ctx) error {
Bytes: pbBytes, Bytes: pbBytes,
} }
fmt.Println("pbMsg.TaskUid--------------", pbMsg.TaskUuid) fmt.Println("pbMsg.TaskUid--------------", pbMsg.TaskId)
wait := req(pbMsg.TaskUuid) wait := req(pbMsg.TaskId)
resAsPb := <-wait resAsPb := <-wait
fmt.Println("resAsPb.TaskResultHeader", string(resAsPb.TaskResultHeader), len(resAsPb.TaskResultHeader)) fmt.Println("resAsPb.TaskResultHeader", string(resAsPb.TaskResultHeader), len(resAsPb.TaskResultHeader))
...@@ -617,7 +617,7 @@ func ApiAndJWT(c *fiber.Ctx) error { ...@@ -617,7 +617,7 @@ func ApiAndJWT(c *fiber.Ctx) error {
} }
// resAsJson := ResponseJson{ // resAsJson := ResponseJson{
// TaskUUID: resAsPb.TaskUuid, // TaskId: resAsPb.TaskId,
// TaskResult: resAsPb.GetTaskResultBody(), // TaskResult: resAsPb.GetTaskResultBody(),
// TaskUid: resAsPb.TaskUid, // TaskUid: resAsPb.TaskUid,
// TaskFee: resAsPb.TaskFee, // TaskFee: resAsPb.TaskFee,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment