Commit 34a3d3fb authored by Your Name's avatar Your Name

add bill

parent 803bfb41
...@@ -16,11 +16,17 @@ import ( ...@@ -16,11 +16,17 @@ import (
const ( const (
RedisAddr = "localhost:6379" RedisAddr = "localhost:6379"
RetryCount = 5 RetryCount = 5
BatchSize = 100 BatchSize = 1000
) )
var Backo = backo.DefaultBacko() var Backo = backo.DefaultBacko()
var Bill *Metering
func init() {
Bill = NewMeteringClient()
}
type UserFee struct { type UserFee struct {
User string User string
Fee decimal.Decimal Fee decimal.Decimal
...@@ -60,13 +66,16 @@ func WithBatchSize(batchSize int) MeteringOption { ...@@ -60,13 +66,16 @@ func WithBatchSize(batchSize int) MeteringOption {
} }
type Metering struct { type Metering struct {
redisCli *redis.Client //redisCli *redis.Client
// IntervalSeconds is the frequency at which messages are flushed. // IntervalSeconds is the frequency at which messages are flushed.
dao *Dao
IntervalSeconds time.Duration IntervalSeconds time.Duration
BatchSize int BatchSize int
// channels // channels
msgs chan UserFee msgs chan BillReq
quit chan struct{} quit chan struct{}
shutdown chan struct{} shutdown chan struct{}
...@@ -83,20 +92,20 @@ type Metering struct { ...@@ -83,20 +92,20 @@ type Metering struct {
} }
// Create a new instance // Create a new instance
func NewMeteringClient(apiKey string, opts ...MeteringOption) *Metering { func NewMeteringClient(opts ...MeteringOption) *Metering {
rdb := redis.NewClient(&redis.Options{
Addr: RedisAddr,
Password: "", // no password set
DB: 0, // use default DB
Protocol: 3, // specify 2 for RESP 2 or 3 for RESP 3
})
m := &Metering{ m := &Metering{
redisCli: rdb, dao: &Dao{
redis.NewClient(&redis.Options{
Addr: RedisAddr,
Password: "", // no password set
DB: 0, // use default DB
Protocol: 3, // specify 2 for RESP 2 or 3 for RESP 3
}),
},
IntervalSeconds: 1 * time.Second, IntervalSeconds: 1 * time.Second,
BatchSize: BatchSize, BatchSize: BatchSize,
msgs: make(chan UserFee, BatchSize), msgs: make(chan BillReq, BatchSize),
quit: make(chan struct{}), quit: make(chan struct{}),
shutdown: make(chan struct{}), shutdown: make(chan struct{}),
now: time.Now, now: time.Now,
...@@ -111,11 +120,26 @@ func NewMeteringClient(apiKey string, opts ...MeteringOption) *Metering { ...@@ -111,11 +120,26 @@ func NewMeteringClient(apiKey string, opts ...MeteringOption) *Metering {
return m return m
} }
type BillReq struct {
Msg UserFee
Accept chan bool
}
// Queue a metering message to send to Ingest API. Messages are flushes periodically at IntervalSeconds or when the BatchSize limit is exceeded. // Queue a metering message to send to Ingest API. Messages are flushes periodically at IntervalSeconds or when the BatchSize limit is exceeded.
func (m *Metering) Meter(msg UserFee) error { func (m *Metering) Meter(msg UserFee) bool {
fmt.Printf("Queuing meter message: %+v", msg) fmt.Printf("Queuing meter message: %+v", msg)
m.queue(msg) m.once.Do(m.startLoop)
return nil
res := make(chan bool)
m.msgs <- BillReq{
Msg: msg,
Accept: res,
}
accept := <-res
return accept
} }
// Start goroutine for concurrent execution to monitor channels // Start goroutine for concurrent execution to monitor channels
...@@ -123,12 +147,12 @@ func (m *Metering) startLoop() { ...@@ -123,12 +147,12 @@ func (m *Metering) startLoop() {
go m.loop() go m.loop()
} }
// Queue the metering message // // Queue the metering message
func (m *Metering) queue(msg UserFee) { // func (m *Metering) queue(msg UserFee) {
m.once.Do(m.startLoop) // m.once.Do(m.startLoop)
//send message to channel // //send message to channel
m.msgs <- msg // m.msgs <- msg
} // }
// Flush all messages in the queue, stop the timer, close all channels, shutdown the client // Flush all messages in the queue, stop the timer, close all channels, shutdown the client
func (m *Metering) Shutdown() error { func (m *Metering) Shutdown() error {
...@@ -145,7 +169,7 @@ func (m *Metering) Shutdown() error { ...@@ -145,7 +169,7 @@ func (m *Metering) Shutdown() error {
} }
// Sends batch to API asynchonrously and limits the number of concurrrent calls to API // Sends batch to API asynchonrously and limits the number of concurrrent calls to API
func (m *Metering) sendAsync(msgs []UserFee) { func (m *Metering) sendAsync(msgs []BillReq) {
m.mutex.Lock() m.mutex.Lock()
//support 1000 asyncrhonus calls //support 1000 asyncrhonus calls
...@@ -173,7 +197,7 @@ func (m *Metering) sendAsync(msgs []UserFee) { ...@@ -173,7 +197,7 @@ func (m *Metering) sendAsync(msgs []UserFee) {
} }
// Send the batch request with retry // Send the batch request with retry
func (m *Metering) send(msgs []UserFee) error { func (m *Metering) send(msgs []BillReq) error {
if len(msgs) == 0 { if len(msgs) == 0 {
return nil return nil
} }
...@@ -192,6 +216,10 @@ func (m *Metering) send(msgs []UserFee) error { ...@@ -192,6 +216,10 @@ func (m *Metering) send(msgs []UserFee) error {
fmt.Printf("Api call retry attempt: %d", i) fmt.Printf("Api call retry attempt: %d", i)
} }
if err = m.dao.ReqRes(msgs); err == nil {
return nil
}
// if err = m.sendToRedis(msgs); err == nil { // if err = m.sendToRedis(msgs); err == nil {
// return nil // return nil
// } // }
...@@ -206,7 +234,7 @@ func (m *Metering) send(msgs []UserFee) error { ...@@ -206,7 +234,7 @@ func (m *Metering) send(msgs []UserFee) error {
// Run the listener loop in a separate thread to monitor all channels // Run the listener loop in a separate thread to monitor all channels
func (m *Metering) loop() { func (m *Metering) loop() {
var msgs []UserFee var msgs []BillReq
tick := time.NewTicker(m.IntervalSeconds) tick := time.NewTicker(m.IntervalSeconds)
fmt.Println("Listener thread and timer have started") fmt.Println("Listener thread and timer have started")
fmt.Printf("loop() ==> Effective batch size %d interval in seconds %d retry attempts %d", m.BatchSize, m.IntervalSeconds, RetryCount) fmt.Printf("loop() ==> Effective batch size %d interval in seconds %d retry attempts %d", m.BatchSize, m.IntervalSeconds, RetryCount)
...@@ -223,7 +251,7 @@ func (m *Metering) loop() { ...@@ -223,7 +251,7 @@ func (m *Metering) loop() {
if len(msgs) >= m.BatchSize { if len(msgs) >= m.BatchSize {
fmt.Printf("exceeded %d messages – flushing", m.BatchSize) fmt.Printf("exceeded %d messages – flushing", m.BatchSize)
m.sendAsync(msgs) m.sendAsync(msgs)
msgs = make([]UserFee, 0, m.BatchSize) msgs = make([]BillReq, 0, m.BatchSize)
} }
//timer event //timer event
...@@ -231,9 +259,9 @@ func (m *Metering) loop() { ...@@ -231,9 +259,9 @@ func (m *Metering) loop() {
if len(msgs) > 0 { if len(msgs) > 0 {
fmt.Printf("interval reached - flushing %d", len(msgs)) fmt.Printf("interval reached - flushing %d", len(msgs))
m.sendAsync(msgs) m.sendAsync(msgs)
msgs = make([]UserFee, 0, m.BatchSize) msgs = make([]BillReq, 0, m.BatchSize)
} else { } else {
fmt.Println("interval reached – nothing to send") //fmt.Println("interval reached – nothing to send")
} }
//process shutdown //process shutdown
......
...@@ -12,6 +12,62 @@ type Dao struct { ...@@ -12,6 +12,62 @@ type Dao struct {
*redis.Client *redis.Client
} }
func (d *Dao) ReqRes(msgs []BillReq) error {
usersMap := make(map[string][]BillReq, len(msgs))
users := make([]string, 0, len(msgs))
for _, msg := range msgs {
if v, ok := usersMap[msg.Msg.User]; ok {
v = append(v, msg)
usersMap[msg.Msg.User] = v
} else {
userBillReq := make([]BillReq, 0, 10)
userBillReq = append(userBillReq, msg)
usersMap[msg.Msg.User] = userBillReq
users = append(users, msg.Msg.User)
}
}
usersBalance, err := d.GetBalanceSubCharge(users)
if err != nil {
return err
}
userCharge := make(map[string]decimal.Decimal, len(usersMap))
for k, v := range usersMap {
for _, billReq := range v {
if usersBalance[k].Cmp(billReq.Msg.Fee) == 1 {
if old, ok := userCharge[k]; ok {
userCharge[k] = old.Add(billReq.Msg.Fee)
} else {
userCharge[k] = billReq.Msg.Fee
}
}
}
}
if err := d.ChargeIncrby(context.Background(), userCharge); err != nil {
return err
}
for k, v := range usersMap {
for _, billReq := range v {
if usersBalance[k].Cmp(billReq.Msg.Fee) == 1 {
billReq.Accept <- true
} else {
billReq.Accept <- false
}
}
}
return nil
}
func (d *Dao) GetBalanceSubCharge(users []string) (map[string]decimal.Decimal, error) { func (d *Dao) GetBalanceSubCharge(users []string) (map[string]decimal.Decimal, error) {
l := len(users) l := len(users)
...@@ -82,16 +138,15 @@ func (d *Dao) GetBalanceSubCharge(users []string) (map[string]decimal.Decimal, e ...@@ -82,16 +138,15 @@ func (d *Dao) GetBalanceSubCharge(users []string) (map[string]decimal.Decimal, e
} }
// INCRBY // INCRBY
func (d *Dao) ChargeIncrby(ctx context.Context, users []UserFee) error { func (d *Dao) ChargeIncrby(ctx context.Context, users map[string]decimal.Decimal) error {
pipe := d.Pipeline() pipe := d.Pipeline()
for _, user := range users { for user, fee := range users {
pipe.IncrBy(ctx, "charge:"+user.User, user.Fee.IntPart()) pipe.IncrBy(ctx, "charge:"+user, fee.IntPart())
} }
_, err := pipe.Exec(ctx) _, err := pipe.Exec(ctx)
return err return err
} }
...@@ -8,5 +8,6 @@ import ( ...@@ -8,5 +8,6 @@ import (
var logger = func(c *fiber.Ctx) { var logger = func(c *fiber.Ctx) {
log.Printf("Request: %s %s", c.Method(), c.Path()) log.Printf("Request: %s %s", c.Method(), c.Path())
c.Next() c.Next()
} }
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"github.com/IBM/sarama" "github.com/IBM/sarama"
"github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2"
"github.com/shopspring/decimal"
// "github.com/gogo/protobuf/proto" // "github.com/gogo/protobuf/proto"
// "github.com/gogo/protobuf/types" // "github.com/gogo/protobuf/types"
...@@ -248,8 +249,10 @@ const ( ...@@ -248,8 +249,10 @@ const (
) )
const ( const (
ChatCompletionsFee = "10" ChatCompletionsFeeStr = "10"
ImagesGenerationsFee = "20" ImagesGenerationsFeeStr = "20"
ChatCompletionsFee = 10
ImagesGenerationsFee = 20
) )
//POST //POST
...@@ -309,7 +312,7 @@ func main() { ...@@ -309,7 +312,7 @@ func main() {
TaskTimestamp: uint64(time.Now().UnixMilli()), TaskTimestamp: uint64(time.Now().UnixMilli()),
TaskCallback: "http://192.168.1.220:6000/callback", TaskCallback: "http://192.168.1.220:6000/callback",
TaskUid: reqHeaders["X-Consumer-Custom-Id"], TaskUid: reqHeaders["X-Consumer-Custom-Id"],
TaskFee: ChatCompletionsFee, TaskFee: ChatCompletionsFeeStr,
} }
pbBytes, err := gogoPbProto.Marshal(&pbMsg) pbBytes, err := gogoPbProto.Marshal(&pbMsg)
...@@ -324,9 +327,20 @@ func main() { ...@@ -324,9 +327,20 @@ func main() {
// res = append(res, reqHeaders["Task-Id"]...) // res = append(res, reqHeaders["Task-Id"]...)
// res = append(res, body...) // res = append(res, body...)
producerMessagesBytes <- bytesAndHeader{ accept := Bill.Meter(UserFee{
Bytes: pbBytes, User: reqHeaders["X-Consumer-Custom-Id"],
HttpHeader: reqHeaders, Fee: decimal.NewFromInt(ChatCompletionsFee),
})
if accept {
producerMessagesBytes <- bytesAndHeader{
Bytes: pbBytes,
HttpHeader: reqHeaders,
}
return c.SendStatus(200)
} else {
return c.SendString("your balance can not pay the request fee")
} }
wait := req(pbMsg.TaskId) wait := req(pbMsg.TaskId)
...@@ -341,7 +355,7 @@ func main() { ...@@ -341,7 +355,7 @@ func main() {
return c.JSON(resAsJson) return c.JSON(resAsJson)
//return c.SendString("Message sent to Kafka producer.") //return c.SendString("Message sent to Kafka producer.")
}).Use(logger) })
app.Post("/images/generations", func(c *fiber.Ctx) error { app.Post("/images/generations", func(c *fiber.Ctx) error {
...@@ -365,7 +379,7 @@ func main() { ...@@ -365,7 +379,7 @@ func main() {
TaskTimestamp: uint64(time.Now().UnixMilli()), TaskTimestamp: uint64(time.Now().UnixMilli()),
TaskCallback: "http://192.168.1.220:6000/callback", TaskCallback: "http://192.168.1.220:6000/callback",
TaskUid: reqHeaders["X-Consumer-Custom-Id"], TaskUid: reqHeaders["X-Consumer-Custom-Id"],
TaskFee: ImagesGenerationsFee, TaskFee: ImagesGenerationsFeeStr,
} }
pbBytes, err := gogoPbProto.Marshal(&pbMsg) pbBytes, err := gogoPbProto.Marshal(&pbMsg)
...@@ -385,6 +399,10 @@ func main() { ...@@ -385,6 +399,10 @@ func main() {
HttpHeader: reqHeaders, HttpHeader: reqHeaders,
} }
return c.SendStatus(200)
return c.SendStatus(200)
wait := req(pbMsg.TaskId) wait := req(pbMsg.TaskId)
resAsPb := <-wait resAsPb := <-wait
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment