Commit d839b8d3 authored by Ubuntu's avatar Ubuntu

from to amount fields

parent 797bfe89
...@@ -22,6 +22,7 @@ require ( ...@@ -22,6 +22,7 @@ require (
github.com/tklauser/numcpus v0.2.2 // indirect github.com/tklauser/numcpus v0.2.2 // indirect
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 // indirect golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 // indirect
golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912 // indirect golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912 // indirect
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 // indirect
google.golang.org/protobuf v1.27.1 // indirect google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
) )
...@@ -491,6 +491,8 @@ golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxb ...@@ -491,6 +491,8 @@ golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 h1:M73Iuj3xbbb9Uk1DYhzydthsj6oOd6l9bpuFcNoUvTs=
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
......
...@@ -19,7 +19,7 @@ var originalTxsHashQueue chan *[]byte = make(chan *[]byte, 1000) ...@@ -19,7 +19,7 @@ var originalTxsHashQueue chan *[]byte = make(chan *[]byte, 1000)
var batchTxsForRedis chan *BatchTx = make(chan *BatchTx, batchTxHashSize*batchTxHashQueueSize) var batchTxsForRedis chan *BatchTx = make(chan *BatchTx, batchTxHashSize*batchTxHashQueueSize)
const batchTxSize = 1000 const batchTxSize = 10000
const batchTxHashSize = 100 const batchTxHashSize = 100
const batchTxHashQueueSize = 10 const batchTxHashQueueSize = 10
......
...@@ -9,7 +9,8 @@ import ( ...@@ -9,7 +9,8 @@ import (
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
"github.com/go-redis/redis_rate/v9" "golang.org/x/time/rate"
// "github.com/go-redis/redis_rate/v9"
) )
var ( var (
...@@ -18,13 +19,13 @@ var ( ...@@ -18,13 +19,13 @@ var (
type Job struct { type Job struct {
Client *redis.Client Client *redis.Client
Limiter *redis_rate.Limiter //Limiter *redis_rate.Limiter
Id int Id int
} }
func initClient(poolSize int) (*redis.Client, *redis_rate.Limiter) { func initClient(poolSize int) (*redis.Client) {
client := redis.NewClient(&redis.Options{ client := redis.NewClient(&redis.Options{
Addr: "54.250.115.98:6379", Addr: "127.0.0.1:6379",
DialTimeout: time.Second, DialTimeout: time.Second,
ReadTimeout: time.Second, ReadTimeout: time.Second,
WriteTimeout: time.Second, WriteTimeout: time.Second,
...@@ -37,61 +38,35 @@ func initClient(poolSize int) (*redis.Client, *redis_rate.Limiter) { ...@@ -37,61 +38,35 @@ func initClient(poolSize int) (*redis.Client, *redis_rate.Limiter) {
panic(err) panic(err)
} }
return client, redis_rate.NewLimiter(client) return client
} }
func Start() { func Start() {
//任务channel 定义缓冲器为job数量 //任务channel 定义缓冲器为job数量
jobs := make(chan Job, jobnum) //jobs := make(chan Job, jobnum)
client, limiter := initClient(10) client := initClient(10)
//defer client.Close() count := 0
limiter := rate.NewLimiter(rate.Every(time.Millisecond*100), 1)
cxt, _ := context.WithCancel(context.TODO())
//定义每个任务执行的方法 for {
jobfunc := func(client *redis.Client, limiter *redis_rate.Limiter, id int) error { limiter.Wait(cxt)
ctx := context.Background()
count := 0
for {
select { select {
case txs := <-batchTxsForRedis: case txs := <-batchTxsForRedis:
startTime := time.Now() startTime := time.Now()
data, err := proto.Marshal(txs) data, err := proto.Marshal(txs)
if err != nil { if err != nil {
return err panic(err)
} }
res, err := limiter.Allow(ctx, "txlimit", redis_rate.PerSecond(10)) if err := client.LPush(context.Background(), "list", data).Err(); err != nil {
if err != nil { panic(err)
return err
} }
if res.Allowed > 0 {
if err := client.LPush(context.Background(), "list", data).Err(); err != nil {
return err
}
}
count += 1 count += 1
fmt.Printf("id: %d send %d txs size: %d takes %v \n", id, count, len(data), time.Since(startTime)) fmt.Printf("count %d txs size: %d takes %v time: %s \n", count, len(data), time.Since(startTime),time.Now())
}
}
}
//1 添加 job 到 channel
go func() {
for index := 0; index < jobnum; index++ {
jobs <- Job{Client: client, Limiter: limiter, Id: index}
} }
defer close(jobs) }
}()
//2 并行执行 jobs
for j := range jobs {
go func(job Job) {
if err := jobfunc(client, limiter, j.Id); err != nil {
panic(err.Error())
}
}(j)
}
} }
...@@ -11,7 +11,7 @@ import ( ...@@ -11,7 +11,7 @@ import (
func TestRateLimit(t *testing.T) { func TestRateLimit(t *testing.T) {
client, _ := initClient(10) client:= initClient(10)
ctx := context.Background() ctx := context.Background()
limiter := redis_rate.NewLimiter(client) limiter := redis_rate.NewLimiter(client)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment