Commit 180276e9 authored by Your Name's avatar Your Name

fix timestamp

parent 288aaa27
...@@ -212,14 +212,17 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent ...@@ -212,14 +212,17 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
select { select {
case task := <-resStream: case task := <-resStream:
nanoseconds := int64(task.TaskFinishTime)
seconds := nanoseconds / 1e9
sender.Table(resTableName). sender.Table(resTableName).
Symbol("type", fmt.Sprintf("%d", task.TaskId)). Symbol("type", fmt.Sprintf("%d", task.TaskId)).
Symbol("uid", fmt.Sprintf("%d", task.TaskUid)). Symbol("uid", task.TaskUid).
Symbol("profit_acc", task.TaskProfitAccount). Symbol("profit_acc", task.TaskProfitAccount).
Symbol("worker_acc", task.TaskWorkerAccount). Symbol("worker_acc", task.TaskWorkerAccount).
Symbol("result", task.TaskResult). Symbol("result", task.TaskResult).
StringColumn("id", task.TaskUuid). StringColumn("id", task.TaskUuid).
TimestampColumn("time", time.UnixMilli(int64(task.TaskFinishTime))). TimestampColumn("time", time.Unix(seconds, nanoseconds%1e9)).
Int64Column("fee", task.TaskFee). Int64Column("fee", task.TaskFee).
Int64Column("workload", int64(task.TaskWorkload)). Int64Column("workload", int64(task.TaskWorkload)).
Int64Column("out_len", int64(task.TaskOutLen)). Int64Column("out_len", int64(task.TaskOutLen)).
...@@ -232,17 +235,37 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent ...@@ -232,17 +235,37 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
} }
case task := <-reqStream: case task := <-reqStream:
// sender.
// Table(reqTableName).
// Symbol("type", fmt.Sprintf("%d", randomType())).
// Symbol("uid", fmt.Sprintf("%d", randomUID())).
// StringColumn("id", uuid.New().String()).
// TimestampColumn("time", time.Now()).
// Int64Column("fee", int64(randomFeeAndWorkload())).
// Int64Column("in_len", int64(randomInput())).
// AtNow(ctx)
_ = task
taskFeeAsInt, err := strconv.Atoi(task.TaskFee) taskFeeAsInt, err := strconv.Atoi(task.TaskFee)
if err != nil { if err != nil {
fmt.Printf("task.TaskFee string to int error: %v\n", err) fmt.Printf("task.TaskFee string to int error: %v\n", err)
continue continue
} }
sender.Table(reqTableName). nanoseconds := int64(task.TaskTimestamp)
seconds := nanoseconds / 1e9
// 使用Unix函数转换为time.Time
// t := time.Unix(seconds, nanoseconds%1e9)
// time.Unix()
sender.
Table(reqTableName).
Symbol("type", fmt.Sprintf("%d", task.TaskType)). Symbol("type", fmt.Sprintf("%d", task.TaskType)).
Symbol("uid", fmt.Sprintf("%d", task.TaskUid)). Symbol("uid", task.TaskUid).
StringColumn("id", task.TaskUuid). StringColumn("id", task.TaskUuid).
TimestampColumn("time", time.UnixMilli(int64(task.TaskTimestamp))). TimestampColumn("time", time.Unix(seconds, nanoseconds%1e9)).
Int64Column("fee", int64(taskFeeAsInt)). Int64Column("fee", int64(taskFeeAsInt)).
Int64Column("in_len", int64(task.TaskInLen)). Int64Column("in_len", int64(task.TaskInLen)).
AtNow(ctx) AtNow(ctx)
...@@ -260,6 +283,22 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent ...@@ -260,6 +283,22 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
} }
} }
func randomInput() int {
return rand.Intn(1536)
}
func randomUID() int {
return rand.Intn(10000)
}
func randomFeeAndWorkload() int {
return rand.Intn(100)
}
func randomType() int {
return rand.Intn(50)
}
type ResponseJson struct { type ResponseJson struct {
TaskUUID string TaskUUID string
TaskResult []byte TaskResult []byte
...@@ -361,7 +400,6 @@ func main() { ...@@ -361,7 +400,6 @@ func main() {
} }
var cmd string var cmd string
var fee int64
cmd = task.Cmd cmd = task.Cmd
pbMsg := pbUpstream.TaskContent{ pbMsg := pbUpstream.TaskContent{
...@@ -370,10 +408,11 @@ func main() { ...@@ -370,10 +408,11 @@ func main() {
TaskType: 1, TaskType: 1,
TaskCmd: cmd, TaskCmd: cmd,
TaskParam: []byte(reqHeaders["Task-Id"][0]), TaskParam: []byte(reqHeaders["Task-Id"][0]),
TaskTimestamp: uint64(time.Now().UnixMilli()), TaskTimestamp: uint64(time.Now().UnixNano()),
TaskCallback: "http://192.168.1.220:6000/v1/callback", TaskCallback: "http://192.168.1.220:6000/v1/callback",
TaskUid: reqHeaders["X-Consumer-Custom-Id"][0], TaskUid: reqHeaders["X-Consumer-Custom-Id"][0],
TaskFee: fmt.Sprintf("%d", fee), TaskFee: fmt.Sprintf("%d", task.Price),
TaskInLen: int32(len(c.Body())),
} }
msgAsJson, err := json.Marshal(pbMsg) msgAsJson, err := json.Marshal(pbMsg)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment