Commit d477fef2 authored by 李伟@五瓣科技's avatar 李伟@五瓣科技

sync mode

parent af9c2f43
...@@ -22,6 +22,7 @@ type Client interface { ...@@ -22,6 +22,7 @@ type Client interface {
// GenerateTx must generate a raw transaction to be sent to the relevant // GenerateTx must generate a raw transaction to be sent to the relevant
// broadcast_tx method for a given endpoint. // broadcast_tx method for a given endpoint.
GenerateTx() (*types.Transaction, error) GenerateTx() (*types.Transaction, error)
BuildTx(data *[]byte) (*types.Transaction, error)
} }
// Our global registry of client factories // Our global registry of client factories
......
...@@ -91,6 +91,7 @@ func (f *EthClientFactory) NewClient(cfg Config) (Client, error) { ...@@ -91,6 +91,7 @@ func (f *EthClientFactory) NewClient(cfg Config) (Client, error) {
buildTxParam.Nonce = nonce buildTxParam.Nonce = nonce
buildTxParam.GasPrice = gasPrice buildTxParam.GasPrice = gasPrice
buildTxParam.ChainId = ChainId buildTxParam.ChainId = ChainId
buildTxParam.GasLimit = 2000000
return &buildTxParam, nil return &buildTxParam, nil
} }
...@@ -137,3 +138,25 @@ func (c *EthClient) GenerateTx() (*types.Transaction, error) { ...@@ -137,3 +138,25 @@ func (c *EthClient) GenerateTx() (*types.Transaction, error) {
} }
} }
func (c *EthClient) BuildTx(data *[]byte) (*types.Transaction, error) {
tx, err := buildSendTx(c.Nonce, toAddress, big.NewInt(0), c.GasLimit, c.GasPrice, *data, c.ChainId, c.PrivateKey)
if err != nil {
return nil, err
}
txAsBytes, err := tx.MarshalBinary()
if err != nil {
return nil, err
}
txAsHex := hexutil.Encode(txAsBytes)
c.logger.Info("build tx", "nonce", c.Nonce, "TxAsHex", txAsHex)
c.Nonce += 1
return tx, nil
}
...@@ -56,15 +56,16 @@ var rootCmd = &cobra.Command{ ...@@ -56,15 +56,16 @@ var rootCmd = &cobra.Command{
ClientFactory: "ethclient", ClientFactory: "ethclient",
SendTxPrivateKey: sendTxPrivateKey, SendTxPrivateKey: sendTxPrivateKey,
WebStaticDir: webStaticDir, WebStaticDir: webStaticDir,
WebsocketAddr: websocketAddr,
} }
transactor, err := multisend.NewTransactor(websocketAddr, &cfg) // transactor, err := multisend.NewTransactor(websocketAddr, &cfg)
if err != nil { // if err != nil {
panic(err) // panic(err)
} // }
transactor.Start() // transactor.Start()
go func() { go func() {
w := multisend.WebServicer{} w := multisend.WebServicer{}
......
...@@ -38,6 +38,7 @@ type Config struct { ...@@ -38,6 +38,7 @@ type Config struct {
NoTrapInterrupts bool `json:"no_trap_interrupts"` // Should we avoid trapping Ctrl+Break? Only relevant for standalone execution mode. NoTrapInterrupts bool `json:"no_trap_interrupts"` // Should we avoid trapping Ctrl+Break? Only relevant for standalone execution mode.
SendTxPrivateKey string `json:"send_tx_private_key"` // Send tx to the chain with the private key. SendTxPrivateKey string `json:"send_tx_private_key"` // Send tx to the chain with the private key.
WebStaticDir string `json:"web_static_dir"` WebStaticDir string `json:"web_static_dir"`
WebsocketAddr string
} }
var validBroadcastTxMethods = map[string]interface{}{ var validBroadcastTxMethods = map[string]interface{}{
......
...@@ -116,6 +116,7 @@ type BatchSend struct { ...@@ -116,6 +116,7 @@ type BatchSend struct {
} }
type WebServicer struct { type WebServicer struct {
transactor *Transactor
cli Client cli Client
} }
...@@ -374,6 +375,15 @@ func (web *WebServicer) WebService(config Config) error { ...@@ -374,6 +375,15 @@ func (web *WebServicer) WebService(config Config) error {
web.cli = client web.cli = client
transactor, err := NewTransactor(config.WebsocketAddr, &config)
if err != nil {
return err
}
transactor.Start()
web.transactor = transactor
handler := cors.Default().Handler(r) handler := cors.Default().Handler(r)
// Bind to a port and pass our router in // Bind to a port and pass our router in
return http.ListenAndServe(":8000", handler) return http.ListenAndServe(":8000", handler)
...@@ -547,6 +557,147 @@ type WebResp struct { ...@@ -547,6 +557,147 @@ type WebResp struct {
AllTxs []ConsTxHashs `json:"all_txs"` AllTxs []ConsTxHashs `json:"all_txs"`
} }
func (web *WebServicer) sendLoop(fromAddr string, toAddrs []string, txCount int, amount int64, id uuid.UUID, requestAmount int64) error {
addrsL := len(toAddrs)
first := true
sendTicker := time.NewTicker(time.Duration(3) * time.Second)
for {
select {
case <-sendTicker.C:
var hashesBytes []byte = make([]byte, 0, 32*batchTxHashSize)
var beginOriginalTx common.Hash
var endOriginalTx common.Hash
var sendRedisBeginTime time.Time
var beginTotal = txCount
for j := 0; j < batchTxHashSize; j++ {
var txsBytes []byte
var txs []TxWithFrom = make([]TxWithFrom, 0, batchTxSize)
txshash := make([]string, 0, batchTxSize)
for i := 0; i < batchTxSize; i++ {
var tx *types.Transaction
var err error
//fmt.Printf("param fromAddr: %s systemFromAddr: %s result: %v \n ", fromAddr, systemFromAddr, (fromAddr != systemFromAddr && first))
if fromAddr != systemFromAddr && first {
txCount++
tx, err = buildOriginalTx(originalTxParam.Nonce, common.HexToAddress(fromAddr), requestAmount, big.NewInt(256), nil)
} else {
//fmt.Printf("amount: %d idx: %d addrsL: %d \n", amount, i, addrsL)
tx, err = buildOriginalTx(originalTxParam.Nonce, common.HexToAddress(toAddrs[i%addrsL]), amount, big.NewInt(256), nil)
}
if err != nil {
return err
}
if j == i && i == 0 {
beginOriginalTx = tx.Hash()
}
endOriginalTx = tx.Hash()
originalTxParam.Nonce += 1
txAsBytes, err := tx.MarshalBinary()
if err != nil {
return err
}
txsBytes = append(txsBytes, txAsBytes...)
if fromAddr != systemFromAddr && first {
txs = append(txs, TxWithFrom{
common.HexToAddress(systemFromAddr).Bytes(),
tx})
first = false
} else {
txs = append(txs, TxWithFrom{
common.HexToAddress(fromAddr).Bytes(),
tx})
}
txshash = append(txshash, tx.Hash().String())
txCount--
if txCount == 0 {
break
}
}
h := sha256.New()
if _, err := h.Write(txsBytes); err != nil {
return err
}
hashBytes := h.Sum(nil)
hashesBytes = append(hashesBytes, hashBytes...)
batchTxs := OriginalBatchTxs{Hash: hashBytes, Txs: txs}
batchTxsForRedis <- &batchTxs
if j == 0 {
sendRedisBeginTime = time.Now()
}
if txCount == 0 {
break
}
}
tx, err := web.cli.BuildTx(&hashesBytes)
if err != nil {
return err
}
if err := web.transactor.SendTx(tx); err != nil {
return err
}
consTxWithBatchs := []ConsTxWithBatchHash{}
consTxWithBatchs = append(consTxWithBatchs, ConsTxWithBatchHash{ConsTxHash: tx.Hash().Bytes(),
BatchTxsHash: hashesBytes})
if record, ok := GetSendRecord(id); ok {
b := BatchSend{
BeginOriginalTx: beginOriginalTx,
EndOriginalTx: endOriginalTx,
SendToRedisBeginTime: sendRedisBeginTime.Unix(),
SendTxsEndTime: time.Now().Unix(),
TxNum: beginTotal - txCount,
ConsTxWithBatchHash: consTxWithBatchs,
}
record.SendRecord = append(record.SendRecord, b)
SetSendRecord(id, record)
}
if txCount == 0 {
break
}
}
if txCount == 0 {
break
}
}
return nil
}
func (web *WebServicer) ProduceTxs(fromAddr string, toAddrs []string, txCount int, amount int64, id uuid.UUID, requestAmount int64) error { func (web *WebServicer) ProduceTxs(fromAddr string, toAddrs []string, txCount int, amount int64, id uuid.UUID, requestAmount int64) error {
addrsL := len(toAddrs) addrsL := len(toAddrs)
......
...@@ -94,7 +94,6 @@ func Start(redisAddr, passwd string) { ...@@ -94,7 +94,6 @@ func Start(redisAddr, passwd string) {
} }
//fmt.Printf("batchTxHash %s listLen: %d list: %v \n", fmt.Sprintf("%x", batchTxs.Hash), len(hashstr), hashstr) //fmt.Printf("batchTxHash %s listLen: %d list: %v \n", fmt.Sprintf("%x", batchTxs.Hash), len(hashstr), hashstr)
if err := client.LPush(context.Background(), fmt.Sprintf("%x", batchTxs.Hash), reverse(hashstr)); err != nil { if err := client.LPush(context.Background(), fmt.Sprintf("%x", batchTxs.Hash), reverse(hashstr)); err != nil {
} }
......
...@@ -13,6 +13,7 @@ import ( ...@@ -13,6 +13,7 @@ import (
"code.wuban.net.cn/multisend/internal/logging" "code.wuban.net.cn/multisend/internal/logging"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
) )
...@@ -238,6 +239,28 @@ func (t *Transactor) setStop(err error) { ...@@ -238,6 +239,28 @@ func (t *Transactor) setStop(err error) {
t.stopMtx.Unlock() t.stopMtx.Unlock()
} }
func (t *Transactor) SendTx(tx *types.Transaction) error {
data, err := tx.MarshalBinary()
if err != nil {
return err
}
args := hexutil.Encode(data)
method := "eth_sendRawTransaction"
msg, err := t.newMessage(method, args)
if err != nil {
return err
}
//op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)}
//return ec.c.CallContext(ctx, nil, "eth_sendRawTransaction", hexutil.Encode(data))
if err := t.writeTx(msg); err != nil {
return err
}
return nil
}
func (t *Transactor) sendTransactions() error { func (t *Transactor) sendTransactions() error {
// send as many transactions as we can, up to the send rate // send as many transactions as we can, up to the send rate
totalSent := t.GetTxCount() totalSent := t.GetTxCount()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment