redis.go 1.35 KB
package multisend

import (
	"context"
	"encoding/json"
	"fmt"

	"time"

	"code.wuban.net.cn/multisend/internal/logging"
	"github.com/go-redis/redis/v8"
	"golang.org/x/time/rate"
)

func initClient(poolSize int, redisAddr, passwd string) *redis.Client {
	client := redis.NewClient(&redis.Options{
		Addr:         redisAddr,
		DialTimeout:  time.Second,
		ReadTimeout:  time.Second,
		WriteTimeout: time.Second,
		PoolSize:     poolSize,
		Password:     passwd,
		DB:           0,
	})

	if err := client.FlushAll(context.Background()).Err(); err != nil {
		panic(err)
	}

	return client
}

func Start(redisAddr, passwd string) {

	logger := logging.NewLogrusLogger(fmt.Sprintf("redis[%s]", redisAddr))

	client := initClient(10, redisAddr, passwd)
	count := 0
	limiter := rate.NewLimiter(rate.Every(time.Millisecond*100), 1)
	cxt, _ := context.WithCancel(context.TODO())

	logTicker := time.NewTicker(5 * time.Second)

	for {
		limiter.Wait(cxt)
		select {
		case batchTxs := <-batchTxsForRedis:

			batchTxsAsBytes, err := json.Marshal(batchTxs)

			if err != nil {
				panic(err)
			}

			if err := client.LPush(context.Background(), "list", batchTxsAsBytes).Err(); err != nil {
				panic(err)
			}
			count += 1

		case <-logTicker.C:
			logger.Info("Sending batchTxs to redis", "idx", count, "totaltxsCount", batchTxSize*count, "now", time.Now().Format("15:04:05"))

		}
	}
}