Commit 8b626404 authored by 李伟@五瓣科技's avatar 李伟@五瓣科技

send tx with websocket

parent 7df426d9
This diff is collapsed.
package multisend package multisend
import ( import (
"context" "encoding/json"
"fmt" "fmt"
"net"
"net/url" "net/url"
"strconv"
"sync" "sync"
"sync/atomic"
"time" "time"
"code.wuban.net.cn/multisend/internal/logging" "code.wuban.net.cn/multisend/internal/logging"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
//"github.com/gorilla/websocket"
//rpctypes "github.com/tendermint/tendermint/rpc/jsonrpc/types" //rpctypes "github.com/tendermint/tendermint/rpc/jsonrpc/types"
) )
...@@ -19,7 +21,7 @@ const ( ...@@ -19,7 +21,7 @@ const (
// see https://github.com/tendermint/tendermint/blob/master/rpc/lib/server/handlers.go // see https://github.com/tendermint/tendermint/blob/master/rpc/lib/server/handlers.go
connPingPeriod = (30 * 9 / 10) * time.Second connPingPeriod = (30 * 9 / 10) * time.Second
jsonRPCID = rpctypes.JSONRPCStringID("tm-load-test") //jsonRPCID = rpctypes.JSONRPCStringID("tm-load-test")
defaultProgressCallbackInterval = 5 * time.Second defaultProgressCallbackInterval = 5 * time.Second
) )
...@@ -30,9 +32,11 @@ type Transactor struct { ...@@ -30,9 +32,11 @@ type Transactor struct {
remoteAddr string // The full URL of the remote WebSockets endpoint. remoteAddr string // The full URL of the remote WebSockets endpoint.
config *Config // The configuration for the load test. config *Config // The configuration for the load test.
idCounter uint32
client Client client Client
logger logging.Logger logger logging.Logger
conn rpc.Client conn *websocket.Conn
broadcastTxMethod string broadcastTxMethod string
wg sync.WaitGroup wg sync.WaitGroup
...@@ -165,15 +169,15 @@ func (t *Transactor) receiveLoop() { ...@@ -165,15 +169,15 @@ func (t *Transactor) receiveLoop() {
func (t *Transactor) sendLoop() { func (t *Transactor) sendLoop() {
defer t.wg.Done() defer t.wg.Done()
// t.conn.SetPingHandler(func(message string) error { t.conn.SetPingHandler(func(message string) error {
// err := t.conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(connSendTimeout)) err := t.conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(connSendTimeout))
// if err == websocket.ErrCloseSent { if err == websocket.ErrCloseSent {
// return nil return nil
// } else if e, ok := err.(net.Error); ok && e.Temporary() { } else if e, ok := err.(net.Error); ok && e.Temporary() {
// return nil return nil
// } }
// return err return err
// }) })
pingTicker := time.NewTicker(connPingPeriod) pingTicker := time.NewTicker(connPingPeriod)
timeLimitTicker := time.NewTicker(time.Duration(t.config.Time) * time.Second) timeLimitTicker := time.NewTicker(time.Duration(t.config.Time) * time.Second)
...@@ -218,20 +222,16 @@ func (t *Transactor) sendLoop() { ...@@ -218,20 +222,16 @@ func (t *Transactor) sendLoop() {
} }
} }
// func (t *Transactor) writeTx(tx []byte) error { func (t *Transactor) writeTx(msg interface{}) error {
// txBase64 := base64.StdEncoding.EncodeToString(tx) // txBase64 := base64.StdEncoding.EncodeToString(tx)
// paramsJSON, err := json.Marshal(map[string]interface{}{"tx": txBase64}) // paramsJSON, err := json.Marshal(map[string]interface{}{"tx": txBase64})
// if err != nil { // if err != nil {
// return err // return err
// } // }
// _ = t.conn.SetWriteDeadline(time.Now().Add(connSendTimeout)) // _ = t.conn.SetWriteDeadline(time.Now().Add(connSendTimeout))
// return t.conn.WriteJSON(rpctypes.RPCRequest{ err := t.conn.WriteJSON(msg)
// JSONRPC: "2.0", return err
// ID: jsonRPCID, }
// Method: t.broadcastTxMethod,
// Params: json.RawMessage(paramsJSON),
// })
// }
func (t *Transactor) mustStop() bool { func (t *Transactor) mustStop() bool {
t.stopMtx.RLock() t.stopMtx.RLock()
...@@ -248,6 +248,13 @@ func (t *Transactor) setStop(err error) { ...@@ -248,6 +248,13 @@ func (t *Transactor) setStop(err error) {
t.stopMtx.Unlock() t.stopMtx.Unlock()
} }
// type requestOp struct {
// ids []json.RawMessage
// err error
// resp chan *jsonrpcMessage // receives up to len(ids) responses
// sub *ClientSubscription // only set for EthSubscribe requests
// }
func (t *Transactor) sendTransactions() error { func (t *Transactor) sendTransactions() error {
// send as many transactions as we can, up to the send rate // send as many transactions as we can, up to the send rate
totalSent := t.GetTxCount() totalSent := t.GetTxCount()
...@@ -270,37 +277,27 @@ func (t *Transactor) sendTransactions() error { ...@@ -270,37 +277,27 @@ func (t *Transactor) sendTransactions() error {
return err return err
} }
t.conn.BatchCallContext() data, err := tx.MarshalBinary()
if err := t.conn.SendTransaction(context.Background(), tx); err != nil { if err != nil {
return err return err
} }
// batch := []BatchElem{ args := hexutil.Encode(data)
// { method := "eth_sendRawTransaction"
// Method: "test_echo",
// Args: []interface{}{"hello", 10, &echoArgs{"world"}}, msg, err := t.newMessage(method, args...)
// Result: new(echoResult),
// },
// {
// Method: "test_echo",
// Args: []interface{}{"hello2", 11, &echoArgs{"world"}},
// Result: new(echoResult),
// },
// {
// Method: "no_such_method",
// Args: []interface{}{1, 2, 3},
// Result: new(int),
// },
// }
// if err := client.BatchCall(batch); err != nil {
// t.Fatal(err)
// }
txAsBytes, err := tx.MarshalBinary()
if err != nil { if err != nil {
return err return err
} }
sentBytes += int64(len(txAsBytes))
//op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)}
//return ec.c.CallContext(ctx, nil, "eth_sendRawTransaction", hexutil.Encode(data))
if err := t.writeTx(msg); err != nil {
return err
}
sentBytes += int64(len(data))
//return ec.c.CallContext(ctx, nil, "eth_sendRawTransaction", hexutil.Encode(data))
// if we have to make way for the next batch // if we have to make way for the next batch
if time.Since(batchStartTime) >= time.Duration(t.config.SendPeriod)*time.Second { if time.Since(batchStartTime) >= time.Duration(t.config.SendPeriod)*time.Second {
break break
...@@ -309,6 +306,22 @@ func (t *Transactor) sendTransactions() error { ...@@ -309,6 +306,22 @@ func (t *Transactor) sendTransactions() error {
return nil return nil
} }
func (c *Transactor) nextID() json.RawMessage {
id := atomic.AddUint32(&c.idCounter, 1)
return strconv.AppendUint(nil, uint64(id), 10)
}
func (c *Transactor) newMessage(method string, paramsIn ...interface{}) (*jsonrpcMessage, error) {
msg := &jsonrpcMessage{Version: vsn, ID: c.nextID(), Method: method}
if paramsIn != nil { // prevent sending "params":null
var err error
if msg.Params, err = json.Marshal(paramsIn); err != nil {
return nil, err
}
}
return msg, nil
}
func (t *Transactor) trackStartTime() { func (t *Transactor) trackStartTime() {
t.statsMtx.Lock() t.statsMtx.Lock()
t.startTime = time.Now() t.startTime = time.Now()
......
package multisend
import (
"testing"
)
func TestTransactor(t *testing.T) {
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment