Commit c745987c authored by gshx's avatar gshx

Merge branch 'heco-testnet' of https://code.wuban.net.cn/liwei/multisend into heco-testnet

parents 7dc6b4cb a2d0f5fa
......@@ -22,6 +22,7 @@ type Client interface {
// GenerateTx must generate a raw transaction to be sent to the relevant
// broadcast_tx method for a given endpoint.
GenerateTx() (*types.Transaction, error)
BuildTx(data *[]byte) (*types.Transaction, error)
}
// Our global registry of client factories
......
......@@ -91,6 +91,7 @@ func (f *EthClientFactory) NewClient(cfg Config) (Client, error) {
buildTxParam.Nonce = nonce
buildTxParam.GasPrice = gasPrice
buildTxParam.ChainId = ChainId
buildTxParam.GasLimit = 2000000
return &buildTxParam, nil
}
......@@ -137,3 +138,25 @@ func (c *EthClient) GenerateTx() (*types.Transaction, error) {
}
}
func (c *EthClient) BuildTx(data *[]byte) (*types.Transaction, error) {
tx, err := buildSendTx(c.Nonce, toAddress, big.NewInt(0), c.GasLimit, c.GasPrice, *data, c.ChainId, c.PrivateKey)
if err != nil {
return nil, err
}
txAsBytes, err := tx.MarshalBinary()
if err != nil {
return nil, err
}
txAsHex := hexutil.Encode(txAsBytes)
c.logger.Info("build tx", "nonce", c.Nonce, "TxAsHex", txAsHex)
c.Nonce += 1
return tx, nil
}
......@@ -39,13 +39,6 @@ var rootCmd = &cobra.Command{
Use: "sendTxs",
Short: "send batch txs hash to chain and original txs to redis",
Run: func(cmd *cobra.Command, args []string) {
//go func() {
// if err := multisend.ProduceOriginalTx(); err != nil {
// panic(err)
// }
//}()
fmt.Printf("webStaticDir: %s \n", webStaticDir)
cfg := multisend.Config{
Rate: rate,
......@@ -56,15 +49,16 @@ var rootCmd = &cobra.Command{
ClientFactory: "ethclient",
SendTxPrivateKey: sendTxPrivateKey,
WebStaticDir: webStaticDir,
WebsocketAddr: websocketAddr,
}
transactor, err := multisend.NewTransactor(websocketAddr, &cfg)
// transactor, err := multisend.NewTransactor(websocketAddr, &cfg)
if err != nil {
panic(err)
}
// if err != nil {
// panic(err)
// }
transactor.Start()
// transactor.Start()
go func() {
w := multisend.WebServicer{}
......
......@@ -38,6 +38,7 @@ type Config struct {
NoTrapInterrupts bool `json:"no_trap_interrupts"` // Should we avoid trapping Ctrl+Break? Only relevant for standalone execution mode.
SendTxPrivateKey string `json:"send_tx_private_key"` // Send tx to the chain with the private key.
WebStaticDir string `json:"web_static_dir"`
WebsocketAddr string
}
var validBroadcastTxMethods = map[string]interface{}{
......
......@@ -116,7 +116,8 @@ type BatchSend struct {
}
type WebServicer struct {
cli Client
transactor *Transactor
cli Client
}
const MaxToAddrsNum = 1000
......@@ -374,6 +375,15 @@ func (web *WebServicer) WebService(config Config) error {
web.cli = client
transactor, err := NewTransactor(config.WebsocketAddr, &config)
if err != nil {
return err
}
transactor.Start()
web.transactor = transactor
handler := cors.Default().Handler(r)
// Bind to a port and pass our router in
return http.ListenAndServe(":8000", handler)
......@@ -491,7 +501,7 @@ func (web *WebServicer) TxsHandler(w http.ResponseWriter, r *http.Request) {
id := uuid.New()
consTxNum := 0
//consTxNum := 0
total := int(params.TxCount)
......@@ -499,18 +509,21 @@ func (web *WebServicer) TxsHandler(w http.ResponseWriter, r *http.Request) {
total = total + 1
}
consTxNum = total / (batchTxHashSize * batchTxSize)
if total%(batchTxHashSize*batchTxSize) != 0 {
consTxNum = consTxNum + 1
}
// consTxNum = total / (batchTxHashSize * batchTxSize)
// if total%(batchTxHashSize*batchTxSize) != 0 {
// consTxNum = consTxNum + 1
// }
SetSendRecord(id, SendRecord{TotalConsTx: int64(consTxNum)})
SetSendRecord(id, SendRecord{TotalConsTx: int64(total)})
go func() {
if err := web.ProduceTxs(params.From, params.ToAddrs, int(params.TxCount), params.EveryTxAmount, id, params.RequestAmount); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
if err := web.sendLoop(params.From, params.ToAddrs, int(params.TxCount), params.EveryTxAmount, id, params.RequestAmount); err != nil {
//if err := web.ProduceTxs(params.From, params.ToAddrs, int(params.TxCount), params.EveryTxAmount, id, params.RequestAmount); err != nil {
fmt.Printf("web send loop, id: %s err: %s \n", id, err.Error())
}
atomic.StoreInt32(&Running, 0)
}()
resAsJson, err := json.Marshal(id)
......@@ -547,6 +560,148 @@ type WebResp struct {
AllTxs []ConsTxHashs `json:"all_txs"`
}
func (web *WebServicer) sendLoop(fromAddr string, toAddrs []string, txCount int, amount int64, id uuid.UUID, requestAmount int64) error {
addrsL := len(toAddrs)
first := true
sendTicker := time.NewTicker(time.Duration(3) * time.Second)
for {
select {
case <-sendTicker.C:
var hashesBytes []byte = make([]byte, 0, 32*batchTxHashSize)
var beginOriginalTx common.Hash
var endOriginalTx common.Hash
var sendRedisBeginTime time.Time
var beginTotal = txCount
for j := 0; j < batchTxHashSize; j++ {
var txsBytes []byte
var txs []TxWithFrom = make([]TxWithFrom, 0, batchTxSize)
txshash := make([]string, 0, batchTxSize)
for i := 0; i < batchTxSize; i++ {
var tx *types.Transaction
var err error
//fmt.Printf("param fromAddr: %s systemFromAddr: %s result: %v \n ", fromAddr, systemFromAddr, (fromAddr != systemFromAddr && first))
if fromAddr != systemFromAddr && first {
txCount++
beginTotal++
tx, err = buildOriginalTx(originalTxParam.Nonce, common.HexToAddress(fromAddr), requestAmount, big.NewInt(256), nil)
} else {
//fmt.Printf("amount: %d idx: %d addrsL: %d \n", amount, i, addrsL)
tx, err = buildOriginalTx(originalTxParam.Nonce, common.HexToAddress(toAddrs[i%addrsL]), amount, big.NewInt(256), nil)
}
if err != nil {
return err
}
if j == i && i == 0 {
beginOriginalTx = tx.Hash()
}
endOriginalTx = tx.Hash()
originalTxParam.Nonce += 1
txAsBytes, err := tx.MarshalBinary()
if err != nil {
return err
}
txsBytes = append(txsBytes, txAsBytes...)
if fromAddr != systemFromAddr && first {
txs = append(txs, TxWithFrom{
common.HexToAddress(systemFromAddr).Bytes(),
tx})
first = false
} else {
txs = append(txs, TxWithFrom{
common.HexToAddress(fromAddr).Bytes(),
tx})
}
txshash = append(txshash, tx.Hash().String())
txCount--
if txCount == 0 {
break
}
}
h := sha256.New()
if _, err := h.Write(txsBytes); err != nil {
return err
}
hashBytes := h.Sum(nil)
hashesBytes = append(hashesBytes, hashBytes...)
batchTxs := OriginalBatchTxs{Hash: hashBytes, Txs: txs}
batchTxsForRedis <- &batchTxs
if j == 0 {
sendRedisBeginTime = time.Now()
}
if txCount == 0 {
break
}
}
tx, err := web.cli.BuildTx(&hashesBytes)
if err != nil {
return err
}
if err := web.transactor.SendTx(tx); err != nil {
return err
}
consTxWithBatchs := []ConsTxWithBatchHash{}
consTxWithBatchs = append(consTxWithBatchs, ConsTxWithBatchHash{ConsTxHash: tx.Hash().Bytes(),
BatchTxsHash: hashesBytes})
if record, ok := GetSendRecord(id); ok {
b := BatchSend{
BeginOriginalTx: beginOriginalTx,
EndOriginalTx: endOriginalTx,
SendToRedisBeginTime: sendRedisBeginTime.Unix(),
SendTxsEndTime: time.Now().Unix(),
TxNum: beginTotal - txCount,
ConsTxWithBatchHash: consTxWithBatchs,
}
record.SendRecord = append(record.SendRecord, b)
SetSendRecord(id, record)
}
if txCount == 0 {
break
}
}
if txCount == 0 {
break
}
}
return nil
}
func (web *WebServicer) ProduceTxs(fromAddr string, toAddrs []string, txCount int, amount int64, id uuid.UUID, requestAmount int64) error {
addrsL := len(toAddrs)
......
......@@ -94,7 +94,6 @@ func Start(redisAddr, passwd string) {
}
//fmt.Printf("batchTxHash %s listLen: %d list: %v \n", fmt.Sprintf("%x", batchTxs.Hash), len(hashstr), hashstr)
if err := client.LPush(context.Background(), fmt.Sprintf("%x", batchTxs.Hash), reverse(hashstr)); err != nil {
}
......
......@@ -13,6 +13,7 @@ import (
"code.wuban.net.cn/multisend/internal/logging"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/google/uuid"
"github.com/gorilla/websocket"
)
......@@ -238,6 +239,28 @@ func (t *Transactor) setStop(err error) {
t.stopMtx.Unlock()
}
func (t *Transactor) SendTx(tx *types.Transaction) error {
data, err := tx.MarshalBinary()
if err != nil {
return err
}
args := hexutil.Encode(data)
method := "eth_sendRawTransaction"
msg, err := t.newMessage(method, args)
if err != nil {
return err
}
//op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)}
//return ec.c.CallContext(ctx, nil, "eth_sendRawTransaction", hexutil.Encode(data))
if err := t.writeTx(msg); err != nil {
return err
}
return nil
}
func (t *Transactor) sendTransactions() error {
// send as many transactions as we can, up to the send rate
totalSent := t.GetTxCount()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment