Commit ae2ca46c authored by 李伟@五瓣科技's avatar 李伟@五瓣科技

upadte send msg to redis

parent bd65d8fc
package multisend
import (
"context"
"crypto/ecdsa"
"encoding/json"
"fmt"
"math/big"
"sync"
......@@ -14,7 +12,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/go-redis/redis/v8"
// "github.com/cbergoon/merkletree"
)
......@@ -24,8 +21,8 @@ var originalTxsHashQueue chan *[]byte = make(chan *[]byte, 1000)
var originalTxsWithFromQueue chan *TxsHash = make(chan *TxsHash, 2000000)
const batchSize = 5
const hashRootSize = 2
const batchSize = 100
const hashRootSize = 200
func init() {
......@@ -121,32 +118,32 @@ func ProduceOriginalTx() error {
}
var ctx = context.Background()
var rdb = redis.NewClient(&redis.Options{
Addr: "54.250.115.98:6379",
Password: "redis20220217", // no password set
DB: 0, // use default DB
})
func SendMd5Tx() {
//超时 超量
// count := 0
// sendTicker := time.NewTicker(time.Duration(5) * time.Second)
for {
select {
case txs := <-originalTxsWithFromQueue:
txsAsJson, err := json.Marshal(txs)
if err != nil {
fmt.Println(err.Error())
continue
}
// var ctx = context.Background()
// var rdb = redis.NewClient(&redis.Options{
// Addr: "54.250.115.98:6379",
// Password: "redis20220217", // no password set
// DB: 0, // use default DB
// })
// func SendMd5Tx() {
// //超时 超量
// // count := 0
// // sendTicker := time.NewTicker(time.Duration(5) * time.Second)
// for {
// select {
// case txs := <-originalTxsWithFromQueue:
// txsAsJson, err := json.Marshal(txs)
// if err != nil {
// fmt.Println(err.Error())
// continue
// }
rdb.LPush(ctx, "list", txsAsJson)
}
}
}
// rdb.LPush(ctx, "list", txsAsJson)
// }
// }
// }
func StartProduceTx() {
wg := sync.WaitGroup{}
......@@ -163,7 +160,7 @@ func StartProduceTx() {
go func() {
defer wg.Done()
SendMd5Tx()
Start()
}()
wg.Wait()
......
package multisend
import (
"context"
"encoding/json"
"fmt"
"runtime"
"time"
"github.com/go-redis/redis/v8"
)
var (
jobnum = runtime.NumCPU()
)
type Job struct {
Client *redis.Client
}
func initClient(poolSize int) *redis.Client {
client := redis.NewClient(&redis.Options{
Addr: "54.250.115.98:6379",
DialTimeout: time.Second,
ReadTimeout: time.Second,
WriteTimeout: time.Second,
PoolSize: poolSize,
Password: "redis20220217",
DB: 0,
})
if err := client.FlushAll(context.Background()).Err(); err != nil {
panic(err)
}
return client
}
func Start() {
//任务channel 定义缓冲器为job数量
jobs := make(chan Job, jobnum)
client := initClient(10)
//defer client.Close()
//定义每个任务执行的方法
jobfunc := func(client *redis.Client) error {
for {
select {
case txs := <-originalTxsWithFromQueue:
txsAsJson, err := json.Marshal(txs)
if err != nil {
return err
}
if err := client.LPush(context.Background(), "list", txsAsJson).Err(); err != nil {
fmt.Println(err.Error())
}
}
}
}
//1 添加 job 到 channel
go func() {
for index := 0; index < jobnum; index++ {
jobs <- Job{client}
}
defer close(jobs)
}()
//2 并行执行 jobs
for j := range jobs {
go func(job Job) {
if err := jobfunc(client); err != nil {
panic(err.Error())
}
}(j)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment