redis.go 2.52 KB
Newer Older
1 2 3 4
package multisend

import (
	"context"
Ubuntu's avatar
Ubuntu committed
5
	"encoding/json"
6
	"fmt"
李伟@五瓣科技's avatar
李伟@五瓣科技 committed
7

8 9
	"time"

10
	"code.wuban.net.cn/multisend/internal/logging"
李伟@五瓣科技's avatar
李伟@五瓣科技 committed
11
	"github.com/ethereum/go-ethereum/common"
12
	"github.com/go-redis/redis/v8"
Ubuntu's avatar
Ubuntu committed
13
	"golang.org/x/time/rate"
14 15
)

李伟@五瓣科技's avatar
李伟@五瓣科技 committed
16 17
var redisCli *redis.Client

李伟@五瓣科技's avatar
李伟@五瓣科技 committed
18
func initClient(poolSize int, redisAddr, passwd string) *redis.Client {
19
	client := redis.NewClient(&redis.Options{
李伟@五瓣科技's avatar
李伟@五瓣科技 committed
20
		Addr:         redisAddr,
21 22 23 24
		DialTimeout:  time.Second,
		ReadTimeout:  time.Second,
		WriteTimeout: time.Second,
		PoolSize:     poolSize,
李伟@五瓣科技's avatar
李伟@五瓣科技 committed
25
		Password:     passwd,
26 27 28 29 30 31
		DB:           0,
	})

	if err := client.FlushAll(context.Background()).Err(); err != nil {
		panic(err)
	}
李伟@五瓣科技's avatar
李伟@五瓣科技 committed
32

Ubuntu's avatar
Ubuntu committed
33
	return client
34 35
}

李伟@五瓣科技's avatar
李伟@五瓣科技 committed
36
func Start(redisAddr, passwd string) {
37

38 39
	logger := logging.NewLogrusLogger(fmt.Sprintf("redis[%s]", redisAddr))

李伟@五瓣科技's avatar
李伟@五瓣科技 committed
40
	client := initClient(10, redisAddr, passwd)
李伟@五瓣科技's avatar
李伟@五瓣科技 committed
41 42 43 44 45

	if redisCli == nil {
		redisCli = client
	}

Ubuntu's avatar
Ubuntu committed
46 47 48
	count := 0
	limiter := rate.NewLimiter(rate.Every(time.Millisecond*100), 1)
	cxt, _ := context.WithCancel(context.TODO())
49

50 51
	logTicker := time.NewTicker(5 * time.Second)

Ubuntu's avatar
Ubuntu committed
52 53
	for {
		limiter.Wait(cxt)
Ubuntu's avatar
Ubuntu committed
54 55 56
		select {
		case batchTxs := <-batchTxsForRedis:

李伟@五瓣科技's avatar
李伟@五瓣科技 committed
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
			hashs := []common.Hash{}

			txBytes := []TxBytes{}

			for _, v := range batchTxs.Txs {
				hashs = append(hashs, v.Tx.Hash())

				txAsBytes, err := v.Tx.MarshalBinary()
				if err != nil {
					panic(err)
				}

				txBytes = append(txBytes, TxBytes{
					From: v.From,
					Tx:   txAsBytes,
				})
			}

			batchTxsAsBytes, err := json.Marshal(RedisBatchTxs{
				Hash: batchTxs.Hash,
				Txs:  txBytes,
			})
79

Ubuntu's avatar
Ubuntu committed
80 81 82
			if err != nil {
				panic(err)
			}
李伟@五瓣科技's avatar
李伟@五瓣科技 committed
83

李伟@五瓣科技's avatar
李伟@五瓣科技 committed
84 85
			fmt.Printf("batchTxs.Hash: %x  count: %d  \n", batchTxs.Hash, count)

Ubuntu's avatar
Ubuntu committed
86 87 88
			if err := client.LPush(context.Background(), "list", batchTxsAsBytes).Err(); err != nil {
				panic(err)
			}
李伟@五瓣科技's avatar
李伟@五瓣科技 committed
89

李伟@五瓣科技's avatar
李伟@五瓣科技 committed
90
			hashstr := []string{}
李伟@五瓣科技's avatar
李伟@五瓣科技 committed
91

李伟@五瓣科技's avatar
李伟@五瓣科技 committed
92 93 94 95
			for _, v := range hashs {
				hashstr = append(hashstr, v.Hex())
			}

李伟@五瓣科技's avatar
李伟@五瓣科技 committed
96
			//fmt.Printf("batchTxHash %s  listLen: %d  list: %v \n", fmt.Sprintf("%x", batchTxs.Hash), len(hashstr), hashstr)
李伟@五瓣科技's avatar
李伟@五瓣科技 committed
97

98
			if err := client.LPush(context.Background(), fmt.Sprintf("%x", batchTxs.Hash), reverse(hashstr)); err != nil {
李伟@五瓣科技's avatar
李伟@五瓣科技 committed
99

李伟@五瓣科技's avatar
李伟@五瓣科技 committed
100 101
			}

Ubuntu's avatar
Ubuntu committed
102
			count += 1
103 104

		case <-logTicker.C:
105
			logger.Info("Sending batchTxs to redis", "idx", count, "totaltxsCount", batchTxSize*count, "now", time.Now().Format("15:04:05"))
106

107
		}
Ubuntu's avatar
Ubuntu committed
108
	}
109
}
李伟@五瓣科技's avatar
李伟@五瓣科技 committed
110 111

type TxBytes struct {
李伟@五瓣科技's avatar
李伟@五瓣科技 committed
112 113
	From []byte `json:"From"`
	Tx   []byte `json:"TxBytes"`
李伟@五瓣科技's avatar
李伟@五瓣科技 committed
114 115 116
}

type RedisBatchTxs struct {
李伟@五瓣科技's avatar
李伟@五瓣科技 committed
117 118
	Txs  []TxBytes `json:"Txs"`
	Hash []byte    `json:"Hash"`
李伟@五瓣科技's avatar
李伟@五瓣科技 committed
119
}
120 121 122 123 124 125 126

func reverse(s []string) []string {
	for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 {
		s[i], s[j] = s[j], s[i]
	}
	return s
}