Commit 803bfb41 authored by Your Name's avatar Your Name

add dao

parent fb3cb89c
Pipeline #663 failed with stages
package main
import (
"fmt"
"sync"
"time"
"github.com/segmentio/backo-go"
"github.com/shopspring/decimal"
"github.com/redis/go-redis/v9"
//"github.com/jehiah/go-strftime"
//"github.com/xtgo/uuid"
)
const (
RedisAddr = "localhost:6379"
RetryCount = 5
BatchSize = 100
)
var Backo = backo.DefaultBacko()
type UserFee struct {
User string
Fee decimal.Decimal
}
// Metering message
type MeterMessage struct {
UniqueId string `json:"uniqueId"`
MeterApiName string `json:"meterApiName"`
CustomerId string `json:"customerId"`
MeterValue float64 `json:"meterValue"`
MeterTimeInMillis int64 `json:"meterTimeInMillis"`
}
// func ExampleClient() *redis.Client {
// url := "redis://user:password@localhost:6379/0?protocol=3"
// opts, err := redis.ParseURL(url)
// if err != nil {
// panic(err)
// }
// return redis.NewClient(opts)
// }
type MeteringOption func(*Metering)
func WithIntervalSeconds(intervalSeconds time.Duration) MeteringOption {
return func(m *Metering) {
m.IntervalSeconds = intervalSeconds
}
}
func WithBatchSize(batchSize int) MeteringOption {
return func(m *Metering) {
m.BatchSize = batchSize
}
}
type Metering struct {
redisCli *redis.Client
// IntervalSeconds is the frequency at which messages are flushed.
IntervalSeconds time.Duration
BatchSize int
// channels
msgs chan UserFee
quit chan struct{}
shutdown chan struct{}
// helper functions
uid func() string
now func() time.Time
// Synch primitives to control number of concurrent calls to API
once sync.Once
wg sync.WaitGroup
mutex sync.Mutex
upcond sync.Cond
counter int
}
// Create a new instance
func NewMeteringClient(apiKey string, opts ...MeteringOption) *Metering {
rdb := redis.NewClient(&redis.Options{
Addr: RedisAddr,
Password: "", // no password set
DB: 0, // use default DB
Protocol: 3, // specify 2 for RESP 2 or 3 for RESP 3
})
m := &Metering{
redisCli: rdb,
IntervalSeconds: 1 * time.Second,
BatchSize: BatchSize,
msgs: make(chan UserFee, BatchSize),
quit: make(chan struct{}),
shutdown: make(chan struct{}),
now: time.Now,
}
//iterate through each option
for _, opt := range opts {
opt(m)
}
m.upcond.L = &m.mutex
return m
}
// Queue a metering message to send to Ingest API. Messages are flushes periodically at IntervalSeconds or when the BatchSize limit is exceeded.
func (m *Metering) Meter(msg UserFee) error {
fmt.Printf("Queuing meter message: %+v", msg)
m.queue(msg)
return nil
}
// Start goroutine for concurrent execution to monitor channels
func (m *Metering) startLoop() {
go m.loop()
}
// Queue the metering message
func (m *Metering) queue(msg UserFee) {
m.once.Do(m.startLoop)
//send message to channel
m.msgs <- msg
}
// Flush all messages in the queue, stop the timer, close all channels, shutdown the client
func (m *Metering) Shutdown() error {
fmt.Println("Running shutdown....")
m.once.Do(m.startLoop)
//start shutdown by sending message to quit channel
m.quit <- struct{}{}
//close the ingest meter messages channel
close(m.msgs)
//receive shutdown message, blocking call
<-m.shutdown
fmt.Println("Shutdown completed")
return nil
}
// Sends batch to API asynchonrously and limits the number of concurrrent calls to API
func (m *Metering) sendAsync(msgs []UserFee) {
m.mutex.Lock()
//support 1000 asyncrhonus calls
for m.counter == 1000 {
//sleep until signal
m.upcond.Wait()
}
m.counter++
m.mutex.Unlock()
m.wg.Add(1)
//spin new thread to call API with retry
go func() {
err := m.send(msgs)
if err != nil {
fmt.Println(err.Error())
}
m.mutex.Lock()
m.counter--
//signal the waiting blocked wait
m.upcond.Signal()
m.mutex.Unlock()
m.wg.Done()
}()
}
// Send the batch request with retry
func (m *Metering) send(msgs []UserFee) error {
if len(msgs) == 0 {
return nil
}
//serialize to json
// b, err := json.Marshal(msgs)
// if err != nil {
// return fmt.Errorf("error marshalling msgs: %s", err)
// }
var err error
//retry attempts to call Ingest API
for i := 0; i < RetryCount; i++ {
if i > 0 {
fmt.Printf("Api call retry attempt: %d", i)
}
// if err = m.sendToRedis(msgs); err == nil {
// return nil
// }
fmt.Printf("Retry attempt: %d error: %s ", i, err.Error())
Backo.Sleep(i)
}
return err
}
// Run the listener loop in a separate thread to monitor all channels
func (m *Metering) loop() {
var msgs []UserFee
tick := time.NewTicker(m.IntervalSeconds)
fmt.Println("Listener thread and timer have started")
fmt.Printf("loop() ==> Effective batch size %d interval in seconds %d retry attempts %d", m.BatchSize, m.IntervalSeconds, RetryCount)
for {
//select to wait on multiple communication operations
//blocks until one of cases can run
select {
//process new meter message
case msg := <-m.msgs:
fmt.Printf("buffer (%d/%d) %v", len(msgs), m.BatchSize, msg)
msgs = append(msgs, msg)
if len(msgs) >= m.BatchSize {
fmt.Printf("exceeded %d messages – flushing", m.BatchSize)
m.sendAsync(msgs)
msgs = make([]UserFee, 0, m.BatchSize)
}
//timer event
case <-tick.C:
if len(msgs) > 0 {
fmt.Printf("interval reached - flushing %d", len(msgs))
m.sendAsync(msgs)
msgs = make([]UserFee, 0, m.BatchSize)
} else {
fmt.Println("interval reached – nothing to send")
}
//process shutdown
case <-m.quit:
//stop the timer
tick.Stop()
//flush the queue
for msg := range m.msgs {
fmt.Printf("queue: (%d/%d) %v", len(msgs), m.BatchSize, msg)
msgs = append(msgs, msg)
}
fmt.Printf("Flushing %d messages", len(msgs))
m.sendAsync(msgs)
//wait for all messages to be sent to the API
m.wg.Wait()
fmt.Println("Queue flushed")
//let caller know shutdown is compelete
m.shutdown <- struct{}{}
return
}
}
}
......@@ -302,7 +302,7 @@ https://docs.aigic.ai/text-to-image
curl http://192.168.1.220:6000/v1/images/generations \
curl http://192.168.1.220:6000/images/generations \
-H "Content-Type: application/json" \
-H "Authorization: C1c9oY1c1ejsLWhFqqVo2eMvww6ZfQ4G" \
-d '{
......
package main
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
"github.com/shopspring/decimal"
)
type Dao struct {
*redis.Client
}
func (d *Dao) GetBalanceSubCharge(users []string) (map[string]decimal.Decimal, error) {
l := len(users)
balancekeys := make([]string, 0, l)
chargeKeys := make([]string, 0, l)
for _, user := range users {
balancekeys = append(balancekeys, "balance:"+user)
chargeKeys = append(chargeKeys, "charge:"+user)
}
vs := d.MGet(context.Background(), append(balancekeys, chargeKeys...)...)
res := make(map[string]decimal.Decimal, l)
redisValueRes := vs.Val()
for idx, v := range redisValueRes {
if idx == l {
break
}
//fmt.Println("l", l, "idx", idx, "l+idx", l+idx, "len(redisValueRes)", len(redisValueRes), "v", v, "redisValueRes[l+idx]", redisValueRes[l+idx])
if v != nil && redisValueRes[l+idx] != nil {
balanceStr, ok := v.(string)
if ok {
chargeStr, ok := redisValueRes[l+idx].(string)
if ok {
balanceDec, err := decimal.NewFromString(balanceStr)
if err != nil {
fmt.Println("user balance error", users[idx], err.Error())
res[users[idx]] = decimal.NewFromInt(0)
continue
}
chargeDec, err := decimal.NewFromString(chargeStr)
if err != nil {
fmt.Println("user charge error", users[idx], err.Error())
res[users[idx]] = decimal.NewFromInt(0)
continue
}
if balanceDec.Cmp(chargeDec) == 1 {
realBalance := balanceDec.Sub(chargeDec)
//fmt.Println("user", users[idx], "balance", balanceDec, "charge", chargeDec, "realBalance", realBalance)
res[users[idx]] = realBalance
} else {
res[users[idx]] = decimal.NewFromInt(0)
}
continue
}
}
}
fmt.Println("user balance or charge is nil", "user", users[idx], "balance", v, "charge", redisValueRes[l+idx])
res[users[idx]] = decimal.NewFromInt(0)
continue
}
return res, nil
}
// INCRBY
func (d *Dao) ChargeIncrby(ctx context.Context, users []UserFee) error {
pipe := d.Pipeline()
for _, user := range users {
pipe.IncrBy(ctx, "charge:"+user.User, user.Fee.IntPart())
}
_, err := pipe.Exec(ctx)
return err
}
package main
import (
"context"
"fmt"
"testing"
"github.com/redis/go-redis/v9"
"github.com/shopspring/decimal"
)
func TestRedis(t *testing.T) {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password set
DB: 0, // use default DB
Protocol: 3, // specify 2 for RESP 2 or 3 for RESP 3
})
vs := rdb.MGet(context.Background(), "key1", "key2")
for k, v := range vs.Val() {
fmt.Printf("idx %d value %v \n", k, v)
}
rdb.IncrBy(context.Background(), "key1", 10)
vs = rdb.MGet(context.Background(), "key1", "key2")
for k, v := range vs.Val() {
fmt.Printf("idx %d value %v type %T\n", k, v, v)
}
}
func TestChargeIncreaseBy(t *testing.T) {
d := &Dao{
redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password set
DB: 0, // use default DB
Protocol: 3, // specify 2 for RESP 2 or 3 for RESP 3
}),
}
usersFee := make([]UserFee, 0, 100)
for i := 0; i < 10; i++ {
usersFee = append(usersFee, UserFee{
User: fmt.Sprintf("user-%v", i),
Fee: decimal.NewFromInt(int64(i)),
})
}
if err := d.ChargeIncrby(context.Background(), usersFee); err != nil {
t.Fatal(err)
}
}
func TestBalanceIncreaseBy(t *testing.T) {
d := &Dao{
redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password set
DB: 0, // use default DB
Protocol: 3, // specify 2 for RESP 2 or 3 for RESP 3
}),
}
usersFee := make([]UserFee, 0, 100)
for i := 0; i < 10; i++ {
usersFee = append(usersFee, UserFee{
User: fmt.Sprintf("user-%v", i),
Fee: decimal.NewFromInt(int64(i * 10)),
})
}
if err := d.BalanceIncrby(context.Background(), usersFee); err != nil {
t.Fatal(err)
}
}
func (d *Dao) BalanceIncrby(ctx context.Context, users []UserFee) error {
pipe := d.Pipeline()
for _, user := range users {
pipe.IncrBy(ctx, "balance:"+user.User, user.Fee.IntPart())
}
_, err := pipe.Exec(ctx)
return err
}
func TestGetBalanceSubCharge(t *testing.T) {
d := &Dao{
redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password set
DB: 0, // use default DB
Protocol: 3, // specify 2 for RESP 2 or 3 for RESP 3
}),
}
usersFee := make([]string, 0, 100)
for i := 0; i < 30; i++ {
usersFee = append(usersFee, fmt.Sprintf("user-%v", i))
}
res, err := d.GetBalanceSubCharge(usersFee)
if err != nil {
t.Fatal(err)
}
for k, v := range res {
t.Logf("k %v v %v \n", k, v)
}
}
......@@ -8,7 +8,14 @@ require (
)
require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 // indirect
github.com/redis/go-redis/v9 v9.4.0 // indirect
github.com/segmentio/backo-go v1.0.1 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c // indirect
google.golang.org/protobuf v1.32.0 // indirect
)
......
......@@ -3,9 +3,13 @@ github.com/IBM/sarama v1.40.0/go.mod h1:6pBloAs1WanL/vsq5qFTyTGulJUntZHhMLOUYEIs
github.com/Shopify/toxiproxy/v2 v2.5.0 h1:i4LPT+qrSlKNtQf5QliVjdP08GyAH8+BUIc9gT0eahc=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/eapache/go-resiliency v1.3.0 h1:RRL0nge+cWGlxXbUzJ7yMcq6w2XBEr19dCN6HECGaT0=
github.com/eapache/go-resiliency v1.3.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 h1:8yY/I9ndfrgrXUbOGObLHKBR4Fl3nZXwM2c7OYTT8hM=
......@@ -43,6 +47,8 @@ github.com/jcmturner/gokrb5/v8 v8.4.3 h1:iTonLeSJOn7MVUtyMT+arAn5AKAPrkilzhGw8wE
github.com/jcmturner/gokrb5/v8 v8.4.3/go.mod h1:dqRwJGXznQrzw6cWmyo6kH+E7jksEQG/CyVWsJEsJO0=
github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY=
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 h1:IPJ3dvxmJ4uczJe5YQdrYB16oTJlGSC/OyZDqUk9xX4=
github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869/go.mod h1:cJ6Cj7dQo+O6GJNiMx+Pa94qKj+TG8ONdKHgMNIyyag=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
......@@ -60,9 +66,15 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/redis/go-redis/v9 v9.4.0 h1:Yzoz33UZw9I/mFhx4MNrB6Fk+XHO1VukNcCa1+lwyKk=
github.com/redis/go-redis/v9 v9.4.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis=
github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/segmentio/backo-go v1.0.1 h1:68RQccglxZeyURy93ASB/2kc9QudzgIDexJ927N++y4=
github.com/segmentio/backo-go v1.0.1/go.mod h1:9/Rh6yILuLysoQnZ2oNooD2g7aBnvM7r/fNVxRNWfBc=
github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8=
github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
......@@ -75,6 +87,8 @@ github.com/valyala/fasthttp v1.48.0 h1:oJWvHb9BIZToTQS3MuQ2R3bJZiNSa2KiNdeI8A+79
github.com/valyala/fasthttp v1.48.0/go.mod h1:k2zXd82h/7UZc3VOdJ2WaUqt1uZ/XpXAfE9i+HBC3lA=
github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8=
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c h1:3lbZUMbMiGUW/LMkfsEABsc5zNT9+b1CvsJx47JzJ8g=
github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c/go.mod h1:UrdRz5enIKZ63MEE3IF9l2/ebyx59GyGgPi+tICQdmM=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
......
package main
import (
"log"
"github.com/gofiber/fiber/v2"
)
var logger = func(c *fiber.Ctx) {
log.Printf("Request: %s %s", c.Method(), c.Path())
c.Next()
}
......@@ -163,6 +163,7 @@ func kafkaConsumerBytes(wg *sync.WaitGroup) {
// Infinite loop to continuously listen for messages from the partitionConsumer.Messages() channel.
for {
select {
case message := <-partitionConsumer.Messages():
......@@ -275,17 +276,19 @@ func main() {
wg.Add(1)
// Launch the Kafka producer goroutine in the background.
go kafkaProducer()
//go kafkaProducer()
go kafkaProducerBytes()
// Launch the Kafka consumer goroutine in the background, passing the WaitGroup for synchronization.
go kafkaConsumer(wg)
//go kafkaConsumer(wg)
//go kafkaConsumerBytes(wg)
go kafkaConsumerBytes(wg)
go recordUUID()
// Wait for the consumer goroutine to be ready
wg.Wait()
//wg.Wait()
app.Post("/chat/completions", func(c *fiber.Ctx) error {
......@@ -338,7 +341,7 @@ func main() {
return c.JSON(resAsJson)
//return c.SendString("Message sent to Kafka producer.")
})
}).Use(logger)
app.Post("/images/generations", func(c *fiber.Ctx) error {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment