From d839b8d39f6192580dfc16b769e31953e7204b64 Mon Sep 17 00:00:00 2001 From: Ubuntu <ubuntu@ip-172-26-1-155.eu-west-2.compute.internal> Date: Mon, 28 Feb 2022 07:32:27 +0000 Subject: [PATCH] from to amount fields --- go.mod | 1 + go.sum | 2 ++ originalTx.go | 2 +- redis.go | 61 +++++++++++++++------------------------------------ redis_test.go | 2 +- 5 files changed, 23 insertions(+), 45 deletions(-) diff --git a/go.mod b/go.mod index 7fa98aa..42e557d 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/tklauser/numcpus v0.2.2 // indirect golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 // indirect golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912 // indirect + golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 // indirect google.golang.org/protobuf v1.27.1 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect ) diff --git a/go.sum b/go.sum index 27d8681..bca6618 100644 --- a/go.sum +++ b/go.sum @@ -491,6 +491,8 @@ golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 h1:M73Iuj3xbbb9Uk1DYhzydthsj6oOd6l9bpuFcNoUvTs= +golang.org/x/time v0.0.0-20220224211638-0e9765cccd65/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/originalTx.go b/originalTx.go index bd258de..72d883d 100644 --- a/originalTx.go +++ b/originalTx.go @@ -19,7 +19,7 @@ var originalTxsHashQueue chan *[]byte = make(chan *[]byte, 1000) var batchTxsForRedis chan *BatchTx = make(chan *BatchTx, batchTxHashSize*batchTxHashQueueSize) -const batchTxSize = 1000 +const batchTxSize = 10000 const batchTxHashSize = 100 const batchTxHashQueueSize = 10 diff --git a/redis.go b/redis.go index abeac9d..9254e83 100644 --- a/redis.go +++ b/redis.go @@ -9,7 +9,8 @@ import ( "github.com/golang/protobuf/proto" "github.com/go-redis/redis/v8" - "github.com/go-redis/redis_rate/v9" + "golang.org/x/time/rate" + // "github.com/go-redis/redis_rate/v9" ) var ( @@ -18,13 +19,13 @@ var ( type Job struct { Client *redis.Client - Limiter *redis_rate.Limiter + //Limiter *redis_rate.Limiter Id int } -func initClient(poolSize int) (*redis.Client, *redis_rate.Limiter) { +func initClient(poolSize int) (*redis.Client) { client := redis.NewClient(&redis.Options{ - Addr: "54.250.115.98:6379", + Addr: "127.0.0.1:6379", DialTimeout: time.Second, ReadTimeout: time.Second, WriteTimeout: time.Second, @@ -37,61 +38,35 @@ func initClient(poolSize int) (*redis.Client, *redis_rate.Limiter) { panic(err) } - return client, redis_rate.NewLimiter(client) + return client } func Start() { //浠诲姟channel 瀹氫箟缂撳啿鍣ㄤ负job鏁伴噺 - jobs := make(chan Job, jobnum) + //jobs := make(chan Job, jobnum) - client, limiter := initClient(10) - //defer client.Close() + client := initClient(10) + count := 0 + limiter := rate.NewLimiter(rate.Every(time.Millisecond*100), 1) + cxt, _ := context.WithCancel(context.TODO()) - //瀹氫箟姣忎釜浠诲姟鎵ц鐨勬柟娉� - jobfunc := func(client *redis.Client, limiter *redis_rate.Limiter, id int) error { - ctx := context.Background() - count := 0 - for { + for { + limiter.Wait(cxt) select { case txs := <-batchTxsForRedis: startTime := time.Now() data, err := proto.Marshal(txs) if err != nil { - return err + panic(err) } - res, err := limiter.Allow(ctx, "txlimit", redis_rate.PerSecond(10)) - if err != nil { - return err + if err := client.LPush(context.Background(), "list", data).Err(); err != nil { + panic(err) } - if res.Allowed > 0 { - if err := client.LPush(context.Background(), "list", data).Err(); err != nil { - return err - } - } count += 1 - fmt.Printf("id: %d send %d txs size: %d takes %v \n", id, count, len(data), time.Since(startTime)) - } - } - } - - //1 娣诲姞 job 鍒� channel - go func() { - for index := 0; index < jobnum; index++ { - jobs <- Job{Client: client, Limiter: limiter, Id: index} + fmt.Printf("count %d txs size: %d takes %v time: %s \n", count, len(data), time.Since(startTime),time.Now()) } - defer close(jobs) - }() - - //2 骞惰鎵ц jobs - for j := range jobs { - go func(job Job) { - if err := jobfunc(client, limiter, j.Id); err != nil { - panic(err.Error()) - } - }(j) - } - + } } diff --git a/redis_test.go b/redis_test.go index 23e280e..2084e9c 100644 --- a/redis_test.go +++ b/redis_test.go @@ -11,7 +11,7 @@ import ( func TestRateLimit(t *testing.T) { - client, _ := initClient(10) + client:= initClient(10) ctx := context.Background() limiter := redis_rate.NewLimiter(client) -- 2.23.0