Commit 797bfe89 authored by 李伟@五瓣科技's avatar 李伟@五瓣科技

redis rate limit

parent fbbba6ed
...@@ -12,6 +12,7 @@ require ( ...@@ -12,6 +12,7 @@ require (
github.com/ethereum/go-ethereum v1.10.16 // indirect github.com/ethereum/go-ethereum v1.10.16 // indirect
github.com/go-ole/go-ole v1.2.1 // indirect github.com/go-ole/go-ole v1.2.1 // indirect
github.com/go-redis/redis/v8 v8.11.4 // indirect github.com/go-redis/redis/v8 v8.11.4 // indirect
github.com/go-redis/redis_rate/v9 v9.1.2 // indirect
github.com/go-stack/stack v1.8.0 // indirect github.com/go-stack/stack v1.8.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect github.com/golang/protobuf v1.5.2 // indirect
github.com/gorilla/websocket v1.4.2 // indirect github.com/gorilla/websocket v1.4.2 // indirect
......
...@@ -133,6 +133,8 @@ github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34 ...@@ -133,6 +133,8 @@ github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34
github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk=
github.com/go-redis/redis/v8 v8.11.4 h1:kHoYkfZP6+pe04aFTnhDH6GDROa5yJdHJVNxV3F46Tg= github.com/go-redis/redis/v8 v8.11.4 h1:kHoYkfZP6+pe04aFTnhDH6GDROa5yJdHJVNxV3F46Tg=
github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w= github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w=
github.com/go-redis/redis_rate/v9 v9.1.2 h1:H0l5VzoAtOE6ydd38j8MCq3ABlGLnvvbA1xDSVVCHgQ=
github.com/go-redis/redis_rate/v9 v9.1.2/go.mod h1:oam2de2apSgRG8aJzwJddXbNu91Iyz1m8IKJE2vpvlQ=
github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg= github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg=
github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
......
...@@ -19,7 +19,7 @@ var originalTxsHashQueue chan *[]byte = make(chan *[]byte, 1000) ...@@ -19,7 +19,7 @@ var originalTxsHashQueue chan *[]byte = make(chan *[]byte, 1000)
var batchTxsForRedis chan *BatchTx = make(chan *BatchTx, batchTxHashSize*batchTxHashQueueSize) var batchTxsForRedis chan *BatchTx = make(chan *BatchTx, batchTxHashSize*batchTxHashQueueSize)
const batchTxSize = 5000 const batchTxSize = 1000
const batchTxHashSize = 100 const batchTxHashSize = 100
const batchTxHashQueueSize = 10 const batchTxHashQueueSize = 10
......
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
"github.com/go-redis/redis_rate/v9"
) )
var ( var (
...@@ -16,11 +17,12 @@ var ( ...@@ -16,11 +17,12 @@ var (
) )
type Job struct { type Job struct {
Client *redis.Client Client *redis.Client
Id int Limiter *redis_rate.Limiter
Id int
} }
func initClient(poolSize int) *redis.Client { func initClient(poolSize int) (*redis.Client, *redis_rate.Limiter) {
client := redis.NewClient(&redis.Options{ client := redis.NewClient(&redis.Options{
Addr: "54.250.115.98:6379", Addr: "54.250.115.98:6379",
DialTimeout: time.Second, DialTimeout: time.Second,
...@@ -34,7 +36,8 @@ func initClient(poolSize int) *redis.Client { ...@@ -34,7 +36,8 @@ func initClient(poolSize int) *redis.Client {
if err := client.FlushAll(context.Background()).Err(); err != nil { if err := client.FlushAll(context.Background()).Err(); err != nil {
panic(err) panic(err)
} }
return client
return client, redis_rate.NewLimiter(client)
} }
func Start() { func Start() {
...@@ -42,12 +45,12 @@ func Start() { ...@@ -42,12 +45,12 @@ func Start() {
//任务channel 定义缓冲器为job数量 //任务channel 定义缓冲器为job数量
jobs := make(chan Job, jobnum) jobs := make(chan Job, jobnum)
client := initClient(10) client, limiter := initClient(10)
//defer client.Close() //defer client.Close()
//定义每个任务执行的方法 //定义每个任务执行的方法
jobfunc := func(client *redis.Client, id int) error { jobfunc := func(client *redis.Client, limiter *redis_rate.Limiter, id int) error {
ctx := context.Background()
count := 0 count := 0
for { for {
select { select {
...@@ -58,9 +61,16 @@ func Start() { ...@@ -58,9 +61,16 @@ func Start() {
return err return err
} }
if err := client.LPush(context.Background(), "list", data).Err(); err != nil { res, err := limiter.Allow(ctx, "txlimit", redis_rate.PerSecond(10))
if err != nil {
return err return err
} }
if res.Allowed > 0 {
if err := client.LPush(context.Background(), "list", data).Err(); err != nil {
return err
}
}
count += 1 count += 1
fmt.Printf("id: %d send %d txs size: %d takes %v \n", id, count, len(data), time.Since(startTime)) fmt.Printf("id: %d send %d txs size: %d takes %v \n", id, count, len(data), time.Since(startTime))
} }
...@@ -70,7 +80,7 @@ func Start() { ...@@ -70,7 +80,7 @@ func Start() {
//1 添加 job 到 channel //1 添加 job 到 channel
go func() { go func() {
for index := 0; index < jobnum; index++ { for index := 0; index < jobnum; index++ {
jobs <- Job{client, index} jobs <- Job{Client: client, Limiter: limiter, Id: index}
} }
defer close(jobs) defer close(jobs)
}() }()
...@@ -78,7 +88,7 @@ func Start() { ...@@ -78,7 +88,7 @@ func Start() {
//2 并行执行 jobs //2 并行执行 jobs
for j := range jobs { for j := range jobs {
go func(job Job) { go func(job Job) {
if err := jobfunc(client, j.Id); err != nil { if err := jobfunc(client, limiter, j.Id); err != nil {
panic(err.Error()) panic(err.Error())
} }
}(j) }(j)
......
package multisend
import (
"context"
"fmt"
"testing"
"time"
"github.com/go-redis/redis_rate/v9"
)
func TestRateLimit(t *testing.T) {
client, _ := initClient(10)
ctx := context.Background()
limiter := redis_rate.NewLimiter(client)
for i := 0; i < 10; i++ {
res, err := limiter.Allow(ctx, "project:123", redis_rate.PerSecond(10))
if err != nil {
panic(err)
}
client.Set(ctx, "key", i, time.Second*100)
fmt.Println("allowed", res.Allowed, "remaining", res.Remaining)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment