Commit f06024aa authored by 李伟@五瓣科技's avatar 李伟@五瓣科技

test

parent 42718ddf
......@@ -37,12 +37,18 @@ var rootCmd = &cobra.Command{
Use: "sendTxs",
Short: "send batch txs hash to chain and original txs to redis",
Run: func(cmd *cobra.Command, args []string) {
//go func() {
// if err := multisend.ProduceOriginalTx(); err != nil {
// panic(err)
// }
//}()
cfg := multisend.Config{
Rate: rate,
Count: count,
Connections: 1,
Time: int(expectedTime),
SendPeriod: int(sendPeriod),
Time: expectedTime,
SendPeriod: sendPeriod,
ClientFactory: "ethclient",
SendTxPrivateKey: sendTxPrivateKey,
}
......@@ -55,14 +61,15 @@ var rootCmd = &cobra.Command{
transactor.Start()
go func() {
if err := multisend.ProduceOriginalTx(); err != nil {
panic(err)
}
// go func() {
w := multisend.WebServicer{}
if err := w.WebService(cfg); err != nil {
panic(err)
}
}()
// }()
multisend.Start(redisAddr, redisPasswd)
//multisend.Start(redisAddr, redisPasswd)
},
}
......@@ -4,7 +4,8 @@ go 1.17
require (
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 // indirect
github.com/btcsuite/btcd v0.20.1-beta // indirect
github.com/btcsuite/btcd v0.21.0-beta // indirect
github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce // indirect
github.com/cbergoon/merkletree v0.2.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/deckarep/golang-set v1.8.0 // indirect
......@@ -15,14 +16,19 @@ require (
github.com/go-redis/redis_rate/v9 v9.1.2 // indirect
github.com/go-stack/stack v1.8.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/miguelmota/go-ethereum-hdwallet v0.1.1 // indirect
github.com/rs/cors v1.8.2 // indirect
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/spf13/cobra v1.3.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/tklauser/go-sysconf v0.3.5 // indirect
github.com/tklauser/numcpus v0.2.2 // indirect
github.com/tyler-smith/go-bip39 v1.0.1-0.20181017060643-dbb3b84ba2ef // indirect
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 // indirect
golang.org/x/sys v0.0.0-20211205182925-97ca703d548d // indirect
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 // indirect
......
This diff is collapsed.
package multisend
import (
"crypto/sha256"
"encoding/json"
"fmt"
"log"
"math/big"
"net/http"
"strconv"
"sync"
"github.com/google/uuid"
"github.com/gorilla/mux"
hdwallet "github.com/miguelmota/go-ethereum-hdwallet"
"github.com/rs/cors"
)
var mnemonic = "matter someone fee garlic final police during vapor stool cargo snake dove"
var processMap sync.Map
func GetSendRecord(id uuid.UUID) (SendRecord, bool) {
if vAsInterface, ok := processMap.Load(id); ok {
if sendR, ok := vAsInterface.(SendRecord); ok {
return sendR, true
}
}
return SendRecord{}, false
}
func SetSendRecord(id uuid.UUID, value SendRecord) {
processMap.Store(id, value)
}
type SendRecord struct {
Percent float64 `json:"percent"`
TotalConsTx int64 `json:"total_cons_tx"`
SendRecord []BatchSend `json:"send_record"`
}
type BatchSend struct {
BeginTime int64 `json:"begin_time"`
EndTime int64 `json:"end_time"`
TxNum int `json:"tx_num"`
}
type WebServicer struct {
cli Client
}
func (web *WebServicer) AddressHandler(w http.ResponseWriter, r *http.Request) {
wallet, err := hdwallet.NewFromMnemonic(mnemonic)
if err != nil {
log.Fatal(err)
}
resp := struct {
From string `json:"from"`
ToAddrs []string `json:"to_addrs"`
}{}
for i := 0; i <= 1000; i++ {
path := hdwallet.MustParseDerivationPath(fmt.Sprintf("m/44'/60'/0'/0/%d", i))
account, err := wallet.Derive(path, false)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if i == 0 {
resp.From = account.Address.Hex()
continue
}
resp.ToAddrs = append(resp.ToAddrs, account.Address.Hex())
}
respAsJson, err := json.Marshal(resp)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(respAsJson)
}
func (web *WebServicer) WebService(config Config) error {
r := mux.NewRouter()
// Routes consist of a path and a handler function.
r.HandleFunc("/addrs", web.AddressHandler)
//r.HandleFunc("/faucet/{addr}", web.FaucetHandler)
r.HandleFunc("/process/{uuid}", web.ProcessHandler)
r.HandleFunc("/txs", web.TxsHandler).Methods("POST")
clientFactory, exists := clientFactories["ethclient"]
if !exists {
return fmt.Errorf("unrecognized client factory: %s", "ethclient")
}
client, err := clientFactory.NewClient(config)
if err != nil {
return err
}
web.cli = client
handler := cors.Default().Handler(r)
// Bind to a port and pass our router in
return http.ListenAndServe(":8000", handler)
}
// func (web *WebServicer) FaucetHandler(w http.ResponseWriter, r *http.Request) {
// vars := mux.Vars(r)
// myString := vars["addr"]
// fmt.Printf("request addr: %s \n", myString)
// w.Header().Set("Content-Type", "application/json")
// w.WriteHeader(http.StatusOK)
// w.Write([]byte(myString))
// }
func (web *WebServicer) ProcessHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
uuidStr := vars["uuid"]
fmt.Printf("request addr: %s \n", uuidStr)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
id, err := uuid.Parse(uuidStr)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if record, ok := GetSendRecord(id); ok {
sent := 0
for _, v := range record.SendRecord {
sent += v.TxNum
}
fmt.Printf("record.TotalConTx: %d sent: %d\n", record.TotalConsTx, sent)
record.Percent = FloatRound(float64(sent)/float64(record.TotalConsTx), 2)
recordAsJon, err := json.Marshal(record)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(recordAsJon)
return
}
if err != nil {
http.Error(w, fmt.Sprintf("can not find the uuid: %s ", id.String()), http.StatusInternalServerError)
return
}
}
func FloatRound(f float64, n int) float64 {
format := "%." + strconv.Itoa(n) + "f"
res, _ := strconv.ParseFloat(fmt.Sprintf(format, f), 64)
return res
}
type TxParams struct {
From string `json:"from,omitempty"`
RequestAmount int64 `json:"request_amount"`
ToAddrs []string `json:"to_addrs"`
BatchTxSize int64 `json:"batch_tx_size,omitempty"`
BatchTxHashSize int64 `json:"batch_tx_hash_size,omitempty"`
TxCount int64 `json:"tx_count"`
EveryTxAmount int64 `json:"every_tx_amount"`
//Rate int64 `json:"rate"`
//ExpectedTime int64 `json:"expected_time"`
}
func (web *WebServicer) TxsHandler(w http.ResponseWriter, r *http.Request) {
var params TxParams
decoder := json.NewDecoder(r.Body)
if err := decoder.Decode(&params); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if params.TxCount > 1000*1000 { //百万
http.Error(w, fmt.Sprintf("max tx count 1000*1000 "), http.StatusBadRequest)
return
}
// 有余额的话,金额不好判断,由前端校验;
// if params.EveryTxAmount * params.TxCount < params.RequestAmount{
// }
//startTime := time.Now()
res, err := web.ProduceTxs(params.From, params.ToAddrs, int(params.TxCount), params.EveryTxAmount)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
//fmt.Printf("takes time: %s \n", time.Since(startTime).String())
resAsJson, err := json.Marshal(res)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
fmt.Printf("len(resAsJson): %d \n", len(resAsJson))
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(resAsJson)
}
type BatchTx struct {
BatchHash string `json:"batch_hash"`
OriginalTxHash []string `json:"original_tx_hash"`
}
type ConsTxHash struct {
TxHash string `json:"tx_hash"`
Batches []BatchTx `json:"batches"`
}
type WebResp struct {
ProcessId uuid.UUID `json:"process_id"`
AllTxs []ConsTxHash `json:"all_txs"`
}
func (web *WebServicer) ProduceTxs(fromAddr string, toAddrs []string, txCount int, amount int64) (*WebResp, error) {
consTxHashs := []ConsTxHash{}
batches := []BatchTx{}
id := uuid.New()
consTxNum := 0
for {
var hashesBytes []byte = make([]byte, 0, 32*batchTxHashSize)
for j := 0; j < batchTxHashSize; j++ {
var txsBytes []byte
var txs []TxWithFrom = make([]TxWithFrom, 0, batchTxSize)
txshash := make([]string, 0, batchTxSize)
for i := 0; i < batchTxSize; i++ {
tx, err := buildOriginalTx(originalTxParam.Nonce, toAddress, big.NewInt(256), nil)
if err != nil {
return nil, err
}
originalTxParam.Nonce += 1
txAsBytes, err := tx.MarshalBinary()
if err != nil {
return nil, err
}
txsBytes = append(txsBytes, txAsBytes...)
txs = append(txs, TxWithFrom{
originalTxParam.FromAddr[:],
txAsBytes})
txshash = append(txshash, tx.Hash().String())
txCount--
if txCount == 0 {
break
}
}
h := sha256.New()
if _, err := h.Write(txsBytes); err != nil {
return nil, err
}
hashBytes := h.Sum(nil)
hashesBytes = append(hashesBytes, hashBytes...)
batch := BatchTx{
BatchHash: fmt.Sprintf("%x", hashBytes),
OriginalTxHash: txshash,
}
batches = append(batches, batch)
batchTxs := OriginalBatchTxs{Hash: hashBytes, Txs: txs}
batchTxsForRedis <- &batchTxs
if txCount == 0 {
break
}
}
originalTxsHashQueue <- &hashesBytes
tx, err := web.cli.GenerateTx()
if err != nil {
return nil, err
}
conTxsQueue <- ConTxsWithId{
Id: id,
Tx: tx}
consTxNum++
consTxHash := ConsTxHash{
TxHash: tx.Hash().Hex(),
Batches: batches,
}
consTxHashs = append(consTxHashs, consTxHash)
if txCount == 0 {
break
}
}
SetSendRecord(id, SendRecord{TotalConsTx: int64(consTxNum)})
return &WebResp{
ProcessId: id,
AllTxs: consTxHashs}, nil
}
package multisend
import (
"testing"
)
func TestWebService(t *testing.T) {
// WebService()
// w :=WebServicer{}
}
......@@ -8,19 +8,27 @@ import (
"crypto/sha256"
"github.com/ethereum/go-ethereum/crypto"
//"github.com/cbergoon/merkletree"
"github.com/google/uuid"
"github.com/ethereum/go-ethereum/core/types"
)
var originalTxParam EthClient
var originTxPrivateKey string = "9e0944f587e1043d6e303644738b0c7c77ed15b176ca574ed0be40c0b9bbdc3a"
var originalTxPrivateKey string = "9e0944f587e1043d6e303644738b0c7c77ed15b176ca574ed0be40c0b9bbdc3a"
var originalTxsHashQueue chan *[]byte = make(chan *[]byte, 1000)
var batchTxsForRedis chan *OriginalBatchTxs = make(chan *OriginalBatchTxs, batchTxHashSize*batchTxHashQueueSize)
var batchTxsForRedis chan *OriginalBatchTxs = make(chan *OriginalBatchTxs, 10000000)
var conTxsQueue chan ConTxsWithId = make(chan ConTxsWithId, 1000)
const batchTxSize = 10000
const batchTxHashSize = 30
const batchTxSize = 10
const batchTxHashSize = 3
const batchTxHashQueueSize = 10
type ConTxsWithId struct {
Id uuid.UUID
Tx *types.Transaction
}
type OriginalBatchTxs struct {
Txs []TxWithFrom
Hash []byte
......@@ -33,7 +41,7 @@ type TxWithFrom struct {
func init() {
originalTxPrivatekeyAsECDSA, err := crypto.HexToECDSA(originTxPrivateKey)
originalTxPrivatekeyAsECDSA, err := crypto.HexToECDSA(originalTxPrivateKey)
if err != nil {
panic(err)
}
......@@ -55,8 +63,6 @@ func init() {
func ProduceOriginalTx() error {
for {
//fmt.Printf("len(originalTxQueue): %d len(originalMd5TxQueue): %d \n", len(originalTxQueue), len(originalMd5TxQueue))
if len(originalTxsHashQueue) < batchTxHashSize*batchTxHashQueueSize {
var hashesBytes []byte = make([]byte, 0, 32*batchTxHashSize)
......@@ -71,6 +77,7 @@ func ProduceOriginalTx() error {
}
originalTxParam.Nonce += 1
txAsBytes, err := tx.MarshalBinary()
if err != nil {
return err
......@@ -101,6 +108,7 @@ func ProduceOriginalTx() error {
}
}
// func StartProduceTx(redisAddr, passwd string) {
func ProduceOriginalTxForWeb(from string, toAddrs []string, txCount int64) error {
// }
return nil
}
......@@ -33,10 +33,10 @@ func TestProduceTx(t *testing.T) {
time.Sleep(2 * time.Second)
}
go func() {
defer wg.Done()
Start()
}()
// go func() {
// defer wg.Done()
// Start()
// }()
wg.Wait()
}
......@@ -5,8 +5,6 @@ import (
"encoding/json"
"fmt"
//"fmt"
"runtime"
"time"
"code.wuban.net.cn/multisend/internal/logging"
......@@ -14,15 +12,6 @@ import (
"golang.org/x/time/rate"
)
var (
jobnum = runtime.NumCPU()
)
type Job struct {
Client *redis.Client
Id int
}
func initClient(poolSize int, redisAddr, passwd string) *redis.Client {
client := redis.NewClient(&redis.Options{
Addr: redisAddr,
......
package multisend
import (
"context"
"fmt"
"testing"
"time"
"github.com/go-redis/redis_rate/v9"
)
func TestRateLimit(t *testing.T) {
client:= initClient(10)
// client:= initClient(10)
ctx := context.Background()
limiter := redis_rate.NewLimiter(client)
// ctx := context.Background()
// limiter := redis_rate.NewLimiter(client)
for i := 0; i < 10; i++ {
res, err := limiter.Allow(ctx, "project:123", redis_rate.PerSecond(10))
if err != nil {
panic(err)
}
// for i := 0; i < 10; i++ {
// res, err := limiter.Allow(ctx, "project:123", redis_rate.PerSecond(10))
// if err != nil {
// panic(err)
// }
client.Set(ctx, "key", i, time.Second*100)
fmt.Println("allowed", res.Allowed, "remaining", res.Remaining)
// client.Set(ctx, "key", i, time.Second*100)
// fmt.Println("allowed", res.Allowed, "remaining", res.Remaining)
}
// }
}
......@@ -12,6 +12,7 @@ import (
"code.wuban.net.cn/multisend/internal/logging"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/google/uuid"
"github.com/gorilla/websocket"
)
......@@ -250,43 +251,59 @@ func (t *Transactor) sendTransactions() error {
var sent int
var sentBytes int64
defer func() {
fmt.Printf("sent %d \n", sent)
t.trackSentTxs(sent, sentBytes)
}()
t.logger.Info("Sending batch of transactions", "now", time.Now().Format("15:04:05"), "toSend", toSend)
batchStartTime := time.Now()
id := uuid.UUID{}
for ; sent < toSend; sent++ {
tx, err := t.client.GenerateTx()
if err != nil {
return err
}
select {
case txWithId := <-conTxsQueue:
id = txWithId.Id
data, err := tx.MarshalBinary()
if err != nil {
return err
}
data, err := txWithId.Tx.MarshalBinary()
if err != nil {
return err
}
args := hexutil.Encode(data)
method := "eth_sendRawTransaction"
args := hexutil.Encode(data)
method := "eth_sendRawTransaction"
msg, err := t.newMessage(method, args)
if err != nil {
return err
}
//op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)}
//return ec.c.CallContext(ctx, nil, "eth_sendRawTransaction", hexutil.Encode(data))
if err := t.writeTx(msg); err != nil {
return err
fmt.Printf("\n%s\n", args)
msg, err := t.newMessage(method, args)
if err != nil {
return err
}
//op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)}
//return ec.c.CallContext(ctx, nil, "eth_sendRawTransaction", hexutil.Encode(data))
if err := t.writeTx(msg); err != nil {
return err
}
sentBytes += int64(len(data))
//return ec.c.CallContext(ctx, nil, "eth_sendRawTransaction", hexutil.Encode(data))
// if we have to make way for the next batch
if time.Since(batchStartTime) >= time.Duration(t.config.SendPeriod)*time.Second {
fmt.Printf("time.Since(batchStartTime): %s \n", fmt.Sprintf("%.20f", time.Since(batchStartTime).Seconds()))
break
}
default:
t.logger.Info("there is no tx in queue", "now", time.Now().Format("15:04:05"))
}
sentBytes += int64(len(data))
}
//return ec.c.CallContext(ctx, nil, "eth_sendRawTransaction", hexutil.Encode(data))
// if we have to make way for the next batch
if time.Since(batchStartTime) >= time.Duration(t.config.SendPeriod)*time.Second {
fmt.Printf("time.Since(batchStartTime): %s \n", fmt.Sprintf("%.20f", time.Since(batchStartTime).Seconds()))
break
if record, ok := GetSendRecord(id); ok {
b := BatchSend{
BeginTime: batchStartTime.Unix(),
EndTime: time.Now().Unix(),
TxNum: sent,
}
record.SendRecord = append(record.SendRecord, b)
SetSendRecord(id, record)
}
return nil
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment