redis.go 2.38 KB
package multisend

import (
	"context"
	"encoding/json"
	"fmt"

	"time"

	"code.wuban.net.cn/multisend/internal/logging"
	"github.com/ethereum/go-ethereum/common"
	"github.com/go-redis/redis/v8"
	"golang.org/x/time/rate"
)

var redisCli *redis.Client

func initClient(poolSize int, redisAddr, passwd string) *redis.Client {
	client := redis.NewClient(&redis.Options{
		Addr:         redisAddr,
		DialTimeout:  time.Second,
		ReadTimeout:  time.Second,
		WriteTimeout: time.Second,
		PoolSize:     poolSize,
		Password:     passwd,
		DB:           0,
	})

	if err := client.FlushAll(context.Background()).Err(); err != nil {
		panic(err)
	}

	return client
}

func Start(redisAddr, passwd string) {

	logger := logging.NewLogrusLogger(fmt.Sprintf("redis[%s]", redisAddr))

	client := initClient(10, redisAddr, passwd)

	if redisCli == nil {
		redisCli = client
	}

	count := 0
	limiter := rate.NewLimiter(rate.Every(time.Millisecond*100), 1)
	cxt, _ := context.WithCancel(context.TODO())

	logTicker := time.NewTicker(5 * time.Second)

	for {
		limiter.Wait(cxt)
		select {
		case batchTxs := <-batchTxsForRedis:

			hashs := []common.Hash{}

			txBytes := []TxBytes{}

			for _, v := range batchTxs.Txs {
				hashs = append(hashs, v.Tx.Hash())

				txAsBytes, err := v.Tx.MarshalBinary()
				if err != nil {
					panic(err)
				}

				txBytes = append(txBytes, TxBytes{
					From: v.From,
					Tx:   txAsBytes,
				})
			}

			batchTxsAsBytes, err := json.Marshal(RedisBatchTxs{
				Hash: batchTxs.Hash,
				Txs:  txBytes,
			})

			if err != nil {
				panic(err)
			}

			fmt.Printf("batchTxs.Hash: %x  count: %d  \n", batchTxs.Hash, count)

			if err := client.LPush(context.Background(), "list", batchTxsAsBytes).Err(); err != nil {
				panic(err)
			}

			hashstr := []string{}

			for _, v := range hashs {
				hashstr = append(hashstr, v.Hex())
			}

			//fmt.Printf("batchTxHash %s  listLen: %d  list: %v \n", fmt.Sprintf("%x", batchTxs.Hash), len(hashstr), hashstr)

			if err := client.LPush(context.Background(), fmt.Sprintf("%x", batchTxs.Hash), hashstr); err != nil {

			}

			count += 1

		case <-logTicker.C:
			logger.Info("Sending batchTxs to redis", "idx", count, "totaltxsCount", batchTxSize*count, "now", time.Now().Format("15:04:05"))

		}
	}
}

type TxBytes struct {
	From []byte `json:"From"`
	Tx   []byte `json:"TxBytes"`
}

type RedisBatchTxs struct {
	Txs  []TxBytes `json:"Txs"`
	Hash []byte    `json:"Hash"`
}