Commit 5d23461f authored by Your Name's avatar Your Name

fix bill and kafuka

parent 34a3d3fb
......@@ -103,7 +103,7 @@ func NewMeteringClient(opts ...MeteringOption) *Metering {
Protocol: 3, // specify 2 for RESP 2 or 3 for RESP 3
}),
},
IntervalSeconds: 1 * time.Second,
IntervalSeconds: 1 * time.Millisecond,
BatchSize: BatchSize,
msgs: make(chan BillReq, BatchSize),
quit: make(chan struct{}),
......
......@@ -337,23 +337,23 @@ func main() {
Bytes: pbBytes,
HttpHeader: reqHeaders,
}
return c.SendStatus(200)
} else {
return c.SendString("your balance can not pay the request fee")
}
wait := req(pbMsg.TaskId)
resAsPb := <-wait
wait := req(pbMsg.TaskId)
resAsPb := <-wait
resAsJson := ResponseJson{
TaskId: resAsPb.TaskId,
TaskResult: resAsPb.TaskResult,
TaskUid: resAsPb.TaskUid,
TaskFee: resAsPb.TaskFee,
}
resAsJson := ResponseJson{
TaskId: resAsPb.TaskId,
TaskResult: resAsPb.TaskResult,
TaskUid: resAsPb.TaskUid,
TaskFee: resAsPb.TaskFee,
return c.JSON(resAsJson)
//return c.SendStatus(200)
} else {
return c.SendString("your balance can not pay the request fee")
}
return c.JSON(resAsJson)
//return c.SendString("Message sent to Kafka producer.")
})
......@@ -389,31 +389,58 @@ func main() {
return c.SendString(fmt.Sprintf("pb error: %v", err.Error()))
}
accept := Bill.Meter(UserFee{
User: reqHeaders["X-Consumer-Custom-Id"],
Fee: decimal.NewFromInt(ChatCompletionsFee),
})
if accept {
producerMessagesBytes <- bytesAndHeader{
Bytes: pbBytes,
HttpHeader: reqHeaders,
}
wait := req(pbMsg.TaskId)
resAsPb := <-wait
resAsJson := ResponseJson{
TaskId: resAsPb.TaskId,
TaskResult: resAsPb.TaskResult,
TaskUid: resAsPb.TaskUid,
TaskFee: resAsPb.TaskFee,
}
return c.JSON(resAsJson)
//return c.SendStatus(200)
} else {
return c.SendString("your balance can not pay the request fee")
}
// res := make([]byte, 0, len(prefix)+len(reqHeaders["Task-Id"])+len(body))
// res = append(res, []byte(prefix)...)
// res = append(res, reqHeaders["Task-Id"]...)
// res = append(res, body...)
producerMessagesBytes <- bytesAndHeader{
Bytes: pbBytes,
HttpHeader: reqHeaders,
}
// producerMessagesBytes <- bytesAndHeader{
// Bytes: pbBytes,
// HttpHeader: reqHeaders,
// }
return c.SendStatus(200)
// return c.SendStatus(200)
return c.SendStatus(200)
// return c.SendStatus(200)
wait := req(pbMsg.TaskId)
resAsPb := <-wait
// wait := req(pbMsg.TaskId)
// resAsPb := <-wait
resAsJson := ResponseJson{
TaskId: resAsPb.TaskId,
TaskResult: resAsPb.TaskResult,
TaskUid: resAsPb.TaskUid,
TaskFee: resAsPb.TaskFee,
}
// resAsJson := ResponseJson{
// TaskId: resAsPb.TaskId,
// TaskResult: resAsPb.TaskResult,
// TaskUid: resAsPb.TaskUid,
// TaskFee: resAsPb.TaskFee,
// }
return c.JSON(resAsJson)
// return c.JSON(resAsJson)
//return c.SendStatus(200)
//return c.SendString("Message sent to Kafka producer.")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment