Commit 9720a44f authored by 李伟@五瓣科技's avatar 李伟@五瓣科技

add cmd

parent 8fcd8558
package main
func main() {
Execute()
}
package main
import (
"fmt"
"os"
"sync"
"code.wuban.net.cn/multisend"
"code.wuban.net.cn/multisend/internal/logging"
"github.com/spf13/cobra"
)
func Execute() {
if err := rootCmd.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
}
}
var (
rate, sendPeriod, count, expectedTime int
websocketAddr, redisAddr, redisPasswd string
)
func init() {
//cobra.OnInitialize(initConfig)
rootCmd.PersistentFlags().StringVar(&websocketAddr, "websocketAddr", "ws://13.40.31.153:8546", "eth classical websocket rpc addr")
rootCmd.PersistentFlags().IntVar(&rate, "rate", 1, "every period send tx number")
rootCmd.PersistentFlags().IntVar(&sendPeriod, "sendPeriod", 3, "send tx time unit")
rootCmd.PersistentFlags().IntVar(&count, "count", 100, "total tx number")
rootCmd.PersistentFlags().IntVar(&expectedTime, "expectedTime", 100, "the expected time used to send the total tx number")
rootCmd.PersistentFlags().StringVar(&redisAddr, "redisAddr", "127.0.0.1:6379", "commit the original txs to the redis queue")
rootCmd.PersistentFlags().StringVar(&redisPasswd, "redisPasswd", "redis20220217", "redis password")
}
var rootCmd = &cobra.Command{
Use: "sendTxs",
Short: "send batch txs hash to chain and original txs to redis",
Run: func(cmd *cobra.Command, args []string) {
logger := logging.NewLogrusLogger("cmd")
cfg := multisend.Config{
Rate: rate,
Count: count,
Connections: 1,
Time: int(expectedTime),
SendPeriod: int(sendPeriod),
ClientFactory: "ethclient",
}
transactor, err := multisend.NewTransactor(websocketAddr, &cfg)
if err != nil {
logger.Error(err.Error())
return
}
transactor.Start()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
if err := multisend.ProduceOriginalTx(); err != nil {
logger.Error(err.Error())
return
}
}()
multisend.Start(redisAddr, redisPasswd)
},
}
...@@ -16,12 +16,15 @@ require ( ...@@ -16,12 +16,15 @@ require (
github.com/go-stack/stack v1.8.0 // indirect github.com/go-stack/stack v1.8.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect github.com/golang/protobuf v1.5.2 // indirect
github.com/gorilla/websocket v1.4.2 // indirect github.com/gorilla/websocket v1.4.2 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect
github.com/sirupsen/logrus v1.8.1 // indirect github.com/sirupsen/logrus v1.8.1 // indirect
github.com/spf13/cobra v1.3.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/tklauser/go-sysconf v0.3.5 // indirect github.com/tklauser/go-sysconf v0.3.5 // indirect
github.com/tklauser/numcpus v0.2.2 // indirect github.com/tklauser/numcpus v0.2.2 // indirect
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 // indirect golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 // indirect
golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912 // indirect golang.org/x/sys v0.0.0-20211205182925-97ca703d548d // indirect
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 // indirect golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 // indirect
google.golang.org/protobuf v1.27.1 // indirect google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
......
This diff is collapsed.
...@@ -2,9 +2,7 @@ package multisend ...@@ -2,9 +2,7 @@ package multisend
import ( import (
"crypto/ecdsa" "crypto/ecdsa"
"fmt"
"math/big" "math/big"
"sync"
"time" "time"
"crypto/sha256" "crypto/sha256"
...@@ -98,56 +96,11 @@ func ProduceOriginalTx() error { ...@@ -98,56 +96,11 @@ func ProduceOriginalTx() error {
} }
originalTxsHashQueue <- &hashesBytes originalTxsHashQueue <- &hashesBytes
} else { } else {
//return nil time.Sleep(time.Millisecond * 100)
time.Sleep(time.Hour * 1)
} }
} }
} }
// var ctx = context.Background() // func StartProduceTx(redisAddr, passwd string) {
// var rdb = redis.NewClient(&redis.Options{
// Addr: "54.250.115.98:6379",
// Password: "redis20220217", // no password set
// DB: 0, // use default DB
// })
// func SendMd5Tx() {
// //超时 超量
// // count := 0
// // sendTicker := time.NewTicker(time.Duration(5) * time.Second)
// for {
// select {
// case txs := <-originalTxsWithFromQueue:
// txsAsJson, err := json.Marshal(txs)
// if err != nil {
// fmt.Println(err.Error())
// continue
// }
// rdb.LPush(ctx, "list", txsAsJson)
// }
// }
// } // }
func StartProduceTx() {
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
if err := ProduceOriginalTx(); err != nil {
fmt.Printf("ProduceOriginalTx stop err: %s\n", err.Error())
}
}()
go func() {
defer wg.Done()
Start()
}()
wg.Wait()
}
...@@ -3,6 +3,7 @@ package multisend ...@@ -3,6 +3,7 @@ package multisend
import ( import (
"context" "context"
"encoding/json" "encoding/json"
//"fmt" //"fmt"
"runtime" "runtime"
"time" "time"
...@@ -20,14 +21,14 @@ type Job struct { ...@@ -20,14 +21,14 @@ type Job struct {
Id int Id int
} }
func initClient(poolSize int) *redis.Client { func initClient(poolSize int, redisAddr, passwd string) *redis.Client {
client := redis.NewClient(&redis.Options{ client := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379", Addr: redisAddr,
DialTimeout: time.Second, DialTimeout: time.Second,
ReadTimeout: time.Second, ReadTimeout: time.Second,
WriteTimeout: time.Second, WriteTimeout: time.Second,
PoolSize: poolSize, PoolSize: poolSize,
Password: "redis20220217", Password: passwd,
DB: 0, DB: 0,
}) })
...@@ -38,9 +39,9 @@ func initClient(poolSize int) *redis.Client { ...@@ -38,9 +39,9 @@ func initClient(poolSize int) *redis.Client {
return client return client
} }
func Start() { func Start(redisAddr, passwd string) {
client := initClient(10) client := initClient(10, redisAddr, passwd)
count := 0 count := 0
limiter := rate.NewLimiter(rate.Every(time.Millisecond*100), 1) limiter := rate.NewLimiter(rate.Every(time.Millisecond*100), 1)
cxt, _ := context.WithCancel(context.TODO()) cxt, _ := context.WithCancel(context.TODO())
......
...@@ -78,7 +78,7 @@ func NewTransactor(remoteAddr string, config *Config) (*Transactor, error) { ...@@ -78,7 +78,7 @@ func NewTransactor(remoteAddr string, config *Config) (*Transactor, error) {
return nil, fmt.Errorf("failed to connect to remote WebSockets endpoint %s: %s (status code %d)", remoteAddr, resp.Status, resp.StatusCode) return nil, fmt.Errorf("failed to connect to remote WebSockets endpoint %s: %s (status code %d)", remoteAddr, resp.Status, resp.StatusCode)
} }
logger := logging.NewLogrusLogger(fmt.Sprintf("transactor[%s]", u.String())) logger := logging.NewLogrusLogger(fmt.Sprintf("transactor[%s]", u.String()))
logger.Info("Connected to remote Tendermint WebSockets RPC") logger.Info("Connected to remote ETH WebSockets RPC")
return &Transactor{ return &Transactor{
remoteAddr: u.String(), remoteAddr: u.String(),
config: config, config: config,
......
...@@ -7,7 +7,7 @@ import ( ...@@ -7,7 +7,7 @@ import (
func TestTransactor(t *testing.T) { func TestTransactor(t *testing.T) {
go StartProduceTx() go StartProduceTx("127.0.0.1:6379", "redis20220217")
//for { //for {
// if len(originalTxsHashQueue) >= 10 { // if len(originalTxsHashQueue) >= 10 {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment