redis.go 1.37 KB
package multisend

import (
	"context"
	"encoding/json"

	//"fmt"
	"runtime"
	"time"

	"github.com/go-redis/redis/v8"
	"golang.org/x/time/rate"
)

var (
	jobnum = runtime.NumCPU()
)

type Job struct {
	Client *redis.Client
	Id     int
}

func initClient(poolSize int, redisAddr, passwd string) *redis.Client {
	client := redis.NewClient(&redis.Options{
		Addr:         redisAddr,
		DialTimeout:  time.Second,
		ReadTimeout:  time.Second,
		WriteTimeout: time.Second,
		PoolSize:     poolSize,
		Password:     passwd,
		DB:           0,
	})

	if err := client.FlushAll(context.Background()).Err(); err != nil {
		panic(err)
	}

	return client
}

func Start(redisAddr, passwd string) {

	client := initClient(10, redisAddr, passwd)
	count := 0
	limiter := rate.NewLimiter(rate.Every(time.Millisecond*100), 1)
	cxt, _ := context.WithCancel(context.TODO())

	for {
		limiter.Wait(cxt)
		select {
		case batchTxs := <-batchTxsForRedis:
			//startTime := time.Now()
			// data, err := proto.Marshal(txs)
			// if err != nil {
			// 	panic(err)
			// }

			batchTxsAsBytes, err := json.Marshal(batchTxs)

			if err != nil {
				panic(err)
			}

			if err := client.LPush(context.Background(), "list", batchTxsAsBytes).Err(); err != nil {
				panic(err)
			}

			count += 1
			//fmt.Printf("count %d txs size: %d  takes %v  time: %s \n", count, len(batchTxsAsBytes), time.Since(startTime), time.Now())
		}
	}
}