Commit c66bf6c0 authored by 李伟@五瓣科技's avatar 李伟@五瓣科技

commit tx ok

parent 8b626404
package multisend
import (
"context"
"crypto/ecdsa"
"crypto/md5"
"fmt"
"math/big"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
)
var privatekey string = "a1994419e9b06a7b27e8d094840ae26a6b7806633bb8be55a1a835f1620d8cec"
var toAddress common.Address = common.HexToAddress("0x4592d8f8d7b001e72cb26a73e4fa1806a51ac79d")
// var data []byte = make([]byte, 0, md5.Size*(1000))
// func init() {
// privatekey = "a1994419e9b06a7b27e8d094840ae26a6b7806633bb8be55a1a835f1620d8cec"
// for i := 0; i < 8140; i++ {
// hash := md5.Sum([]byte(fmt.Sprintf("%d", i)))
// data = append(data, hash[:]...)
// }
// }
type EthClientFactory struct{}
type EthClient struct {
NodeRpcURL string
}
var _ ClientFactory = (*EthClientFactory)(nil)
var _ Client = (*EthClient)(nil)
func init() {
if err := RegisterClientFactory("ethclient", NewEthClientFactory()); err != nil {
panic(err)
}
}
func NewEthClientFactory() *EthClientFactory {
return &EthClientFactory{}
}
func (f *EthClientFactory) ValidateConfig(cfg Config) error {
return nil
}
func (f *EthClientFactory) NewClient(cfg Config) (Client, error) {
return &EthClient{
NodeRpcURL: "http://13.40.31.153:8545",
}, nil
}
func (c *EthClient) GenerateTx() (*types.Transaction, error) {
privateKeyAsECDSA, err := crypto.HexToECDSA(privatekey)
if err != nil {
return nil, err
}
publicKey := privateKeyAsECDSA.Public()
publicKeyECDSA, ok := publicKey.(*ecdsa.PublicKey)
if !ok {
return nil, fmt.Errorf("publicKey.(*ecdsa.PublicKey) not ok")
}
fromAddress := crypto.PubkeyToAddress(*publicKeyECDSA)
client, err := ethclient.Dial(c.NodeRpcURL)
if err != nil {
return nil, err
}
nonce, err := client.PendingNonceAt(context.Background(), fromAddress)
if err != nil {
return nil, err
}
gasPrice, err := client.SuggestGasPrice(context.Background())
if err != nil {
return nil, err
}
chainID, err := client.NetworkID(context.Background())
if err != nil {
return nil, err
}
txs, md5data, err := getBatchTx(8000, c.NodeRpcURL, false)
if err != nil {
return nil, err
}
_ = txs
gasLimit, err := client.EstimateGas(context.Background(), ethereum.CallMsg{
To: &toAddress,
Data: md5data,
})
tx := types.NewTransaction(nonce, toAddress, big.NewInt(10000000000000), gasLimit, gasPrice, md5data)
signedTx, err := types.SignTx(tx, types.NewEIP155Signer(chainID), privateKeyAsECDSA)
if err != nil {
return nil, err
}
return signedTx, nil
}
func getBatchTx(txNum int, nodeUrl string, signed bool) ([]*types.Transaction, []byte, error) {
res := make([]*types.Transaction, 0, txNum)
md5Data := make([]byte, 0, md5.Size*(txNum))
//publicKey := privateKey.Public()
// publicKeyECDSA, ok := publicKey.(*ecdsa.PublicKey)
// if !ok {
// log.Fatal("cannot assert type: publicKey is not of type *ecdsa.PublicKey")
// }
// fromAddress := crypto.PubkeyToAddress(*publicKeyECDSA)
// cli, err := ethclient.Dial(nodeUrl)
// if err != nil {
// log.Fatal(err)
// }
// nonce, err := cli.PendingNonceAt(context.Background(), fromAddress)
// if err != nil {
// log.Fatal(err)
// }
// gasLimit, err := cli.EstimateGas(context.Background(), ethereum.CallMsg{
// To: &toAddress,
// Data: data,
// })
// gasPrice, err := cli.SuggestGasPrice(context.Background())
// if err != nil {
// log.Fatal(err)
// }
// chainID, _ := cli.NetworkID(context.Background())
// fmt.Printf("gasLimit: %v gasPrice: %v chainID: %v \n", 4178026, 1000000000, 256)
privateKeyAsECDSA, err := crypto.HexToECDSA(privatekey)
if err != nil {
return nil, nil, err
}
for i := 0; i < txNum; i++ {
tx := types.NewTransaction(0+uint64(i), toAddress, big.NewInt(10000000000000), 4178026, big.NewInt(1000000000), nil)
if signed {
signedTx, err := types.SignTx(tx, types.NewEIP155Signer(big.NewInt(256)), privateKeyAsECDSA)
//signedTx, err := types.SignTx(tx, types.NewEIP155Signer(chainID), privateKey)
if err != nil {
return nil, nil, err
}
tx = signedTx
}
txAsBytes, err := tx.MarshalBinary()
if err != nil {
return nil, nil, err
}
hash := md5.Sum(txAsBytes)
md5Data = append(md5Data, hash[:]...)
res = append(res, tx)
}
return res, md5Data, nil
}
package multisend package multisend
import ( // import (
"context" // "context"
"fmt" // "fmt"
"net" // "net"
"net/url" // "net/url"
"os" // "os"
"os/signal" // "os/signal"
"syscall" // "syscall"
"time" // "time"
"code.wuban.net.cn/multisend/internal/logging" // "code.wuban.net.cn/multisend/internal/logging"
"github.com/ethereum/go-ethereum/ethclient" // "github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc" // "github.com/ethereum/go-ethereum/rpc"
) // )
type ethPeerInfo struct { // type ethPeerInfo struct {
Addr string // Addr string
Client ethclient.Client // Client ethclient.Client
PeerAddrs []string // PeerAddrs []string
SuccessfullyQueried bool // SuccessfullyQueried bool
} // }
func waitForEthNetworkPeers( // func waitForEthNetworkPeers(
endpoints []string, // endpoints []string,
selectionMethod string, // selectionMethod string,
minDiscoveredPeers int, // minDiscoveredPeers int,
minPeerConnectivity int, // minPeerConnectivity int,
maxReturnedPeers int, // maxReturnedPeers int,
timeout time.Duration, // timeout time.Duration,
logger logging.Logger, // logger logging.Logger,
) ([]string, error) { // ) ([]string, error) {
logger.Info("waiting for eth public node to connect", // logger.Info("waiting for eth public node to connect",
"endpoints", endpoints, // "endpoints", endpoints,
"selectionMethod", selectionMethod, // "selectionMethod", selectionMethod,
"minDiscoveredPeers", minDiscoveredPeers, // "minDiscoveredPeers", minDiscoveredPeers,
"minPeerConnectivity", minPeerConnectivity, // "minPeerConnectivity", minPeerConnectivity,
"maxReturnedPeers", maxReturnedPeers, // "maxReturnedPeers", maxReturnedPeers,
"timeout", fmt.Sprintf("%.2f seconds", timeout.Seconds())) // "timeout", fmt.Sprintf("%.2f seconds", timeout.Seconds()))
cancelc := make(chan struct{}, 1) // cancelc := make(chan struct{}, 1)
cancelTrap := trapInterrupts(func() { close(cancelc) }, logger) // cancelTrap := trapInterrupts(func() { close(cancelc) }, logger)
defer close(cancelTrap) // defer close(cancelTrap)
startTime := time.Now() // startTime := time.Now()
suppliedPeers := make(map[string]*ethPeerInfo) // suppliedPeers := make(map[string]*ethPeerInfo)
for _, peerURL := range endpoints { // for _, peerURL := range endpoints {
u, err := url.Parse(peerURL) // u, err := url.Parse(peerURL)
if err != nil { // if err != nil {
return nil, fmt.Errorf("failed to parse peer URL %s: %s", peerURL, err) // return nil, fmt.Errorf("failed to parse peer URL %s: %s", peerURL, err)
} // }
peerIP, err := lookupFirstIPv4Addr(u.Hostname()) // peerIP, err := lookupFirstIPv4Addr(u.Hostname())
if err != nil { // if err != nil {
return nil, fmt.Errorf("failed to resolve IP address for endpoint %s: %s", peerURL, err) // return nil, fmt.Errorf("failed to resolve IP address for endpoint %s: %s", peerURL, err)
} // }
peerAddr := fmt.Sprintf("http://%s:8546", peerIP) // peerAddr := fmt.Sprintf("http://%s:8546", peerIP)
client, err := rpc.DialWebsocket(context.Background(), peerAddr, "") // client, err := rpc.DialWebsocket(context.Background(), peerAddr, "")
if err != nil { // if err != nil {
return nil, err // return nil, err
} // }
suppliedPeers[peerAddr] = &ethPeerInfo{ // suppliedPeers[peerAddr] = &ethPeerInfo{
Addr: peerAddr, // Addr: peerAddr,
Client: client, // Client: client,
PeerAddrs: make([]string, 0), // PeerAddrs: make([]string, 0),
} // }
} // }
peers := make(map[string]*ethPeerInfo) // peers := make(map[string]*ethPeerInfo)
for a, p := range suppliedPeers { // for a, p := range suppliedPeers {
pc := *p // pc := *p
peers[a] = &pc // peers[a] = &pc
} // }
for { // for {
remainingTimeout := timeout - time.Since(startTime) // remainingTimeout := timeout - time.Since(startTime)
if remainingTimeout < 0 { // if remainingTimeout < 0 {
return nil, fmt.Errorf("timed out waiting for Tendermint peer crawl to complete") // return nil, fmt.Errorf("timed out waiting for Tendermint peer crawl to complete")
} // }
newPeers, err := getEthNetworkPeers(peers, remainingTimeout, cancelc, logger) // newPeers, err := getEthNetworkPeers(peers, remainingTimeout, cancelc, logger)
if err != nil { // if err != nil {
return nil, err // return nil, err
} // }
// we only care if we've discovered more peers than in the previous attempt // // we only care if we've discovered more peers than in the previous attempt
if len(newPeers) > len(peers) { // if len(newPeers) > len(peers) {
peers = newPeers // peers = newPeers
} // }
peerCount := len(peers) // peerCount := len(peers)
peerConnectivity := getMinPeerConnectivity(peers) // peerConnectivity := getMinPeerConnectivity(peers)
if peerCount >= minDiscoveredPeers && peerConnectivity >= minPeerConnectivity { // if peerCount >= minDiscoveredPeers && peerConnectivity >= minPeerConnectivity {
logger.Info("All required peers connected", "count", peerCount, "minConnectivity", minPeerConnectivity) // logger.Info("All required peers connected", "count", peerCount, "minConnectivity", minPeerConnectivity)
// we're done here // // we're done here
return filterEthPeerMap(suppliedPeers, peers, selectionMethod, maxReturnedPeers), nil // return filterEthPeerMap(suppliedPeers, peers, selectionMethod, maxReturnedPeers), nil
} else { // } else {
logger.Debug( // logger.Debug(
"Peers discovered so far", // "Peers discovered so far",
"count", peerCount, // "count", peerCount,
"minConnectivity", peerConnectivity, // "minConnectivity", peerConnectivity,
"remainingTimeout", timeout-time.Since(startTime), // "remainingTimeout", timeout-time.Since(startTime),
) // )
time.Sleep(1 * time.Second) // time.Sleep(1 * time.Second)
} // }
} // }
return nil, nil // return nil, nil
} // }
func trapInterrupts(onKill func(), logger logging.Logger) chan struct{} { // func trapInterrupts(onKill func(), logger logging.Logger) chan struct{} {
sigc := make(chan os.Signal, 1) // sigc := make(chan os.Signal, 1)
cancelTrap := make(chan struct{}) // cancelTrap := make(chan struct{})
signal.Notify(sigc, os.Interrupt, syscall.SIGTERM) // signal.Notify(sigc, os.Interrupt, syscall.SIGTERM)
go func() { // go func() {
select { // select {
case <-sigc: // case <-sigc:
logger.Info("Caught kill signal") // logger.Info("Caught kill signal")
onKill() // onKill()
case <-cancelTrap: // case <-cancelTrap:
logger.Debug("Interrupt trap cancelled") // logger.Debug("Interrupt trap cancelled")
} // }
}() // }()
return cancelTrap // return cancelTrap
} // }
func lookupFirstIPv4Addr(hostname string) (string, error) { // func lookupFirstIPv4Addr(hostname string) (string, error) {
ipRecords, err := net.LookupIP(hostname) // ipRecords, err := net.LookupIP(hostname)
if err != nil { // if err != nil {
return "", err // return "", err
} // }
for _, ipRecord := range ipRecords { // for _, ipRecord := range ipRecords {
ipv4 := ipRecord.To4() // ipv4 := ipRecord.To4()
if ipv4 != nil { // if ipv4 != nil {
return ipv4.String(), nil // return ipv4.String(), nil
} // }
} // }
return "", fmt.Errorf("no IPv4 records for hostname: %s", hostname) // return "", fmt.Errorf("no IPv4 records for hostname: %s", hostname)
} // }
// Queries the given peers (in parallel) to construct a unique set of known // // Queries the given peers (in parallel) to construct a unique set of known
// peers across the entire network. // // peers across the entire network.
func getEthNetworkPeers( // func getEthNetworkPeers(
peers map[string]*ethPeerInfo, // Any existing peers we know about already // peers map[string]*ethPeerInfo, // Any existing peers we know about already
timeout time.Duration, // Maximum timeout for the entire operation // timeout time.Duration, // Maximum timeout for the entire operation
cancelc chan struct{}, // Allows us to cancel the polling operations // cancelc chan struct{}, // Allows us to cancel the polling operations
logger logging.Logger, // logger logging.Logger,
) (map[string]*ethPeerInfo, error) { // ) (map[string]*ethPeerInfo, error) {
startTime := time.Now() // startTime := time.Now()
peerInfoc := make(chan *ethPeerInfo, len(peers)) // peerInfoc := make(chan *ethPeerInfo, len(peers))
errc := make(chan error, len(peers)) // errc := make(chan error, len(peers))
logger.Debug("Querying peers for more peers", "count", len(peers), "peers", getPeerAddrs(peers)) // logger.Debug("Querying peers for more peers", "count", len(peers), "peers", getPeerAddrs(peers))
// parallelize querying all the Tendermint nodes' peers // // parallelize querying all the Tendermint nodes' peers
for _, peer := range peers { // for _, peer := range peers {
// reset this every time // // reset this every time
peer.SuccessfullyQueried = false // peer.SuccessfullyQueried = false
go func(peer_ *ethPeerInfo) { // go func(peer_ *ethPeerInfo) {
netInfo, err := peer_.Client.NetInfo(context.Background()) // netInfo, err := peer_.Client.NetInfo(context.Background())
if err != nil { // if err != nil {
logger.Debug("Failed to query peer - skipping", "addr", peer_.Addr, "err", err) // logger.Debug("Failed to query peer - skipping", "addr", peer_.Addr, "err", err)
errc <- err // errc <- err
return // return
} // }
peerAddrs := make([]string, 0) // peerAddrs := make([]string, 0)
for _, peerInfo := range netInfo.Peers { // for _, peerInfo := range netInfo.Peers {
peerAddrs = append(peerAddrs, fmt.Sprintf("http://%s:8546", peerInfo.RemoteIP)) // peerAddrs = append(peerAddrs, fmt.Sprintf("http://%s:8546", peerInfo.RemoteIP))
} // }
peerInfoc <- &ethPeerInfo{ // peerInfoc <- &ethPeerInfo{
Addr: peer_.Addr, // Addr: peer_.Addr,
Client: peer_.Client, // Client: peer_.Client,
PeerAddrs: peerAddrs, // PeerAddrs: peerAddrs,
SuccessfullyQueried: true, // SuccessfullyQueried: true,
} // }
}(peer) // }(peer)
} // }
result := make(map[string]*ethPeerInfo) // result := make(map[string]*ethPeerInfo)
expectedNetInfoResults := len(peers) // expectedNetInfoResults := len(peers)
receivedNetInfoResults := 0 // receivedNetInfoResults := 0
for { // for {
remainingTimeout := timeout - time.Since(startTime) // remainingTimeout := timeout - time.Since(startTime)
if remainingTimeout < 0 { // if remainingTimeout < 0 {
return nil, fmt.Errorf("timed out waiting for all peer network info to be returned") // return nil, fmt.Errorf("timed out waiting for all peer network info to be returned")
} // }
select { // select {
case <-cancelc: // case <-cancelc:
return nil, fmt.Errorf("cancel signal received") // return nil, fmt.Errorf("cancel signal received")
case peerInfo := <-peerInfoc: // case peerInfo := <-peerInfoc:
result[peerInfo.Addr] = peerInfo // result[peerInfo.Addr] = peerInfo
receivedNetInfoResults++ // receivedNetInfoResults++
case <-errc: // case <-errc:
receivedNetInfoResults++ // receivedNetInfoResults++
case <-time.After(remainingTimeout): // case <-time.After(remainingTimeout):
return nil, fmt.Errorf("timed out while waiting for all peer network info to be returned") // return nil, fmt.Errorf("timed out while waiting for all peer network info to be returned")
} // }
if receivedNetInfoResults >= expectedNetInfoResults { // if receivedNetInfoResults >= expectedNetInfoResults {
return resolveTendermintPeerMap(result), nil // return resolveTendermintPeerMap(result), nil
} else { // } else {
// wait a little before polling again // // wait a little before polling again
time.Sleep(1 * time.Second) // time.Sleep(1 * time.Second)
} // }
} // }
} // }
func resolveTendermintPeerMap(peers map[string]*ethPeerInfo) map[string]*ethPeerInfo { // func resolveTendermintPeerMap(peers map[string]*ethPeerInfo) map[string]*ethPeerInfo {
result := make(map[string]*ethPeerInfo) // result := make(map[string]*ethPeerInfo)
for addr, peer := range peers { // for addr, peer := range peers {
result[addr] = peer // result[addr] = peer
for _, peerAddr := range peer.PeerAddrs { // for _, peerAddr := range peer.PeerAddrs {
client, err := rpc.DialWebsocket(context.Background(), peerAddr, "") // client, err := rpc.DialWebsocket(context.Background(), peerAddr, "")
if err != nil { // if err != nil {
return nil // return nil
} // }
if _, exists := result[peerAddr]; !exists { // if _, exists := result[peerAddr]; !exists {
result[peerAddr] = &ethPeerInfo{ // result[peerAddr] = &ethPeerInfo{
Addr: peerAddr, // Addr: peerAddr,
Client: client, // Client: client,
PeerAddrs: make([]string, 0), // PeerAddrs: make([]string, 0),
} // }
} // }
} // }
} // }
return result // return result
} // }
func getPeerAddrs(peers map[string]*ethPeerInfo) []string { // func getPeerAddrs(peers map[string]*ethPeerInfo) []string {
results := make([]string, 0) // results := make([]string, 0)
for _, peer := range peers { // for _, peer := range peers {
results = append(results, peer.Addr) // results = append(results, peer.Addr)
} // }
return results // return results
} // }
func getMinPeerConnectivity(peers map[string]*ethPeerInfo) int { // func getMinPeerConnectivity(peers map[string]*ethPeerInfo) int {
minPeers := len(peers) // minPeers := len(peers)
for _, peer := range peers { // for _, peer := range peers {
// we only care about peers we've successfully queried so far // // we only care about peers we've successfully queried so far
if !peer.SuccessfullyQueried { // if !peer.SuccessfullyQueried {
continue // continue
} // }
peerCount := len(peer.PeerAddrs) // peerCount := len(peer.PeerAddrs)
if peerCount > 0 && peerCount < minPeers { // if peerCount > 0 && peerCount < minPeers {
minPeers = peerCount // minPeers = peerCount
} // }
} // }
return minPeers // return minPeers
} // }
func filterEthPeerMap(suppliedPeers, newPeers map[string]*ethPeerInfo, selectionMethod string, maxCount int) []string { // func filterEthPeerMap(suppliedPeers, newPeers map[string]*ethPeerInfo, selectionMethod string, maxCount int) []string {
result := make([]string, 0) // result := make([]string, 0)
for peerAddr := range newPeers { // for peerAddr := range newPeers {
u, err := url.Parse(peerAddr) // u, err := url.Parse(peerAddr)
if err != nil { // if err != nil {
continue // continue
} // }
addr := fmt.Sprintf("ws://%s:8546", u.Hostname()) // addr := fmt.Sprintf("ws://%s:8546", u.Hostname())
switch selectionMethod { // switch selectionMethod {
case SelectSuppliedEndpoints: // case SelectSuppliedEndpoints:
// only add it to the result if it was in the original list // // only add it to the result if it was in the original list
if _, ok := suppliedPeers[peerAddr]; ok { // if _, ok := suppliedPeers[peerAddr]; ok {
result = append(result, addr) // result = append(result, addr)
} // }
case SelectDiscoveredEndpoints: // case SelectDiscoveredEndpoints:
// only add it to the result if it wasn't in the original list // // only add it to the result if it wasn't in the original list
if _, ok := suppliedPeers[peerAddr]; !ok { // if _, ok := suppliedPeers[peerAddr]; !ok {
result = append(result, addr) // result = append(result, addr)
} // }
default: // default:
// otherwise, always add it // // otherwise, always add it
result = append(result, addr) // result = append(result, addr)
} // }
if len(result) >= maxCount { // if len(result) >= maxCount {
break // break
} // }
} // }
return result // return result
} // }
...@@ -17,15 +17,9 @@ ...@@ -17,15 +17,9 @@
package multisend package multisend
import ( import (
"bytes"
"context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io"
"reflect"
"strings" "strings"
"sync"
"time" "time"
) )
...@@ -39,6 +33,20 @@ const ( ...@@ -39,6 +33,20 @@ const (
defaultWriteTimeout = 10 * time.Second // used if context has no deadline defaultWriteTimeout = 10 * time.Second // used if context has no deadline
) )
const defaultErrorCode = -32000
// Error wraps RPC errors, which contain an error code in addition to the message.
type Error interface {
Error() string // returns the message
ErrorCode() int // returns the code
}
// A DataError contains some data in addition to the error message.
type DataError interface {
Error() string // returns the message
ErrorData() interface{} // returns the error data
}
var null = json.RawMessage("null") var null = json.RawMessage("null")
type subscriptionResult struct { type subscriptionResult struct {
...@@ -143,205 +151,205 @@ func (err *jsonError) ErrorData() interface{} { ...@@ -143,205 +151,205 @@ func (err *jsonError) ErrorData() interface{} {
return err.Data return err.Data
} }
// Conn is a subset of the methods of net.Conn which are sufficient for ServerCodec. // // Conn is a subset of the methods of net.Conn which are sufficient for ServerCodec.
type Conn interface { // type Conn interface {
io.ReadWriteCloser // io.ReadWriteCloser
SetWriteDeadline(time.Time) error // SetWriteDeadline(time.Time) error
} // }
type deadlineCloser interface { // type deadlineCloser interface {
io.Closer // io.Closer
SetWriteDeadline(time.Time) error // SetWriteDeadline(time.Time) error
} // }
// ConnRemoteAddr wraps the RemoteAddr operation, which returns a description // // ConnRemoteAddr wraps the RemoteAddr operation, which returns a description
// of the peer address of a connection. If a Conn also implements ConnRemoteAddr, this // // of the peer address of a connection. If a Conn also implements ConnRemoteAddr, this
// description is used in log messages. // // description is used in log messages.
type ConnRemoteAddr interface { // type ConnRemoteAddr interface {
RemoteAddr() string // RemoteAddr() string
} // }
// jsonCodec reads and writes JSON-RPC messages to the underlying connection. It also has // // jsonCodec reads and writes JSON-RPC messages to the underlying connection. It also has
// support for parsing arguments and serializing (result) objects. // // support for parsing arguments and serializing (result) objects.
type jsonCodec struct { // type jsonCodec struct {
remote string // remote string
closer sync.Once // close closed channel once // closer sync.Once // close closed channel once
closeCh chan interface{} // closed on Close // closeCh chan interface{} // closed on Close
decode func(v interface{}) error // decoder to allow multiple transports // decode func(v interface{}) error // decoder to allow multiple transports
encMu sync.Mutex // guards the encoder // encMu sync.Mutex // guards the encoder
encode func(v interface{}) error // encoder to allow multiple transports // encode func(v interface{}) error // encoder to allow multiple transports
conn deadlineCloser // conn deadlineCloser
} // }
// NewFuncCodec creates a codec which uses the given functions to read and write. If conn // // NewFuncCodec creates a codec which uses the given functions to read and write. If conn
// implements ConnRemoteAddr, log messages will use it to include the remote address of // // implements ConnRemoteAddr, log messages will use it to include the remote address of
// the connection. // // the connection.
func NewFuncCodec(conn deadlineCloser, encode, decode func(v interface{}) error) ServerCodec { // func NewFuncCodec(conn deadlineCloser, encode, decode func(v interface{}) error) ServerCodec {
codec := &jsonCodec{ // codec := &jsonCodec{
closeCh: make(chan interface{}), // closeCh: make(chan interface{}),
encode: encode, // encode: encode,
decode: decode, // decode: decode,
conn: conn, // conn: conn,
} // }
if ra, ok := conn.(ConnRemoteAddr); ok { // if ra, ok := conn.(ConnRemoteAddr); ok {
codec.remote = ra.RemoteAddr() // codec.remote = ra.RemoteAddr()
} // }
return codec // return codec
} // }
// NewCodec creates a codec on the given connection. If conn implements ConnRemoteAddr, log // // NewCodec creates a codec on the given connection. If conn implements ConnRemoteAddr, log
// messages will use it to include the remote address of the connection. // // messages will use it to include the remote address of the connection.
func NewCodec(conn Conn) ServerCodec { // func NewCodec(conn Conn) ServerCodec {
enc := json.NewEncoder(conn) // enc := json.NewEncoder(conn)
dec := json.NewDecoder(conn) // dec := json.NewDecoder(conn)
dec.UseNumber() // dec.UseNumber()
return NewFuncCodec(conn, enc.Encode, dec.Decode) // return NewFuncCodec(conn, enc.Encode, dec.Decode)
} // }
func (c *jsonCodec) peerInfo() PeerInfo { // func (c *jsonCodec) peerInfo() PeerInfo {
// This returns "ipc" because all other built-in transports have a separate codec type. // // This returns "ipc" because all other built-in transports have a separate codec type.
return PeerInfo{Transport: "ipc", RemoteAddr: c.remote} // return PeerInfo{Transport: "ipc", RemoteAddr: c.remote}
} // }
func (c *jsonCodec) remoteAddr() string { // func (c *jsonCodec) remoteAddr() string {
return c.remote // return c.remote
} // }
func (c *jsonCodec) readBatch() (messages []*jsonrpcMessage, batch bool, err error) { // func (c *jsonCodec) readBatch() (messages []*jsonrpcMessage, batch bool, err error) {
// Decode the next JSON object in the input stream. // // Decode the next JSON object in the input stream.
// This verifies basic syntax, etc. // // This verifies basic syntax, etc.
var rawmsg json.RawMessage // var rawmsg json.RawMessage
if err := c.decode(&rawmsg); err != nil { // if err := c.decode(&rawmsg); err != nil {
return nil, false, err // return nil, false, err
} // }
messages, batch = parseMessage(rawmsg) // messages, batch = parseMessage(rawmsg)
for i, msg := range messages { // for i, msg := range messages {
if msg == nil { // if msg == nil {
// Message is JSON 'null'. Replace with zero value so it // // Message is JSON 'null'. Replace with zero value so it
// will be treated like any other invalid message. // // will be treated like any other invalid message.
messages[i] = new(jsonrpcMessage) // messages[i] = new(jsonrpcMessage)
} // }
} // }
return messages, batch, nil // return messages, batch, nil
} // }
func (c *jsonCodec) writeJSON(ctx context.Context, v interface{}) error { // func (c *jsonCodec) writeJSON(ctx context.Context, v interface{}) error {
c.encMu.Lock() // c.encMu.Lock()
defer c.encMu.Unlock() // defer c.encMu.Unlock()
deadline, ok := ctx.Deadline() // deadline, ok := ctx.Deadline()
if !ok { // if !ok {
deadline = time.Now().Add(defaultWriteTimeout) // deadline = time.Now().Add(defaultWriteTimeout)
} // }
c.conn.SetWriteDeadline(deadline) // c.conn.SetWriteDeadline(deadline)
return c.encode(v) // return c.encode(v)
} // }
func (c *jsonCodec) close() { // func (c *jsonCodec) close() {
c.closer.Do(func() { // c.closer.Do(func() {
close(c.closeCh) // close(c.closeCh)
c.conn.Close() // c.conn.Close()
}) // })
} // }
// Closed returns a channel which will be closed when Close is called // // Closed returns a channel which will be closed when Close is called
func (c *jsonCodec) closed() <-chan interface{} { // func (c *jsonCodec) closed() <-chan interface{} {
return c.closeCh // return c.closeCh
} // }
// parseMessage parses raw bytes as a (batch of) JSON-RPC message(s). There are no error // // parseMessage parses raw bytes as a (batch of) JSON-RPC message(s). There are no error
// checks in this function because the raw message has already been syntax-checked when it // // checks in this function because the raw message has already been syntax-checked when it
// is called. Any non-JSON-RPC messages in the input return the zero value of // // is called. Any non-JSON-RPC messages in the input return the zero value of
// jsonrpcMessage. // // jsonrpcMessage.
func parseMessage(raw json.RawMessage) ([]*jsonrpcMessage, bool) { // func parseMessage(raw json.RawMessage) ([]*jsonrpcMessage, bool) {
if !isBatch(raw) { // if !isBatch(raw) {
msgs := []*jsonrpcMessage{{}} // msgs := []*jsonrpcMessage{{}}
json.Unmarshal(raw, &msgs[0]) // json.Unmarshal(raw, &msgs[0])
return msgs, false // return msgs, false
} // }
dec := json.NewDecoder(bytes.NewReader(raw)) // dec := json.NewDecoder(bytes.NewReader(raw))
dec.Token() // skip '[' // dec.Token() // skip '['
var msgs []*jsonrpcMessage // var msgs []*jsonrpcMessage
for dec.More() { // for dec.More() {
msgs = append(msgs, new(jsonrpcMessage)) // msgs = append(msgs, new(jsonrpcMessage))
dec.Decode(&msgs[len(msgs)-1]) // dec.Decode(&msgs[len(msgs)-1])
} // }
return msgs, true // return msgs, true
} // }
// isBatch returns true when the first non-whitespace characters is '[' // // isBatch returns true when the first non-whitespace characters is '['
func isBatch(raw json.RawMessage) bool { // func isBatch(raw json.RawMessage) bool {
for _, c := range raw { // for _, c := range raw {
// skip insignificant whitespace (http://www.ietf.org/rfc/rfc4627.txt) // // skip insignificant whitespace (http://www.ietf.org/rfc/rfc4627.txt)
if c == 0x20 || c == 0x09 || c == 0x0a || c == 0x0d { // if c == 0x20 || c == 0x09 || c == 0x0a || c == 0x0d {
continue // continue
} // }
return c == '[' // return c == '['
} // }
return false // return false
} // }
// parsePositionalArguments tries to parse the given args to an array of values with the // // parsePositionalArguments tries to parse the given args to an array of values with the
// given types. It returns the parsed values or an error when the args could not be // // given types. It returns the parsed values or an error when the args could not be
// parsed. Missing optional arguments are returned as reflect.Zero values. // // parsed. Missing optional arguments are returned as reflect.Zero values.
func parsePositionalArguments(rawArgs json.RawMessage, types []reflect.Type) ([]reflect.Value, error) { // func parsePositionalArguments(rawArgs json.RawMessage, types []reflect.Type) ([]reflect.Value, error) {
dec := json.NewDecoder(bytes.NewReader(rawArgs)) // dec := json.NewDecoder(bytes.NewReader(rawArgs))
var args []reflect.Value // var args []reflect.Value
tok, err := dec.Token() // tok, err := dec.Token()
switch { // switch {
case err == io.EOF || tok == nil && err == nil: // case err == io.EOF || tok == nil && err == nil:
// "params" is optional and may be empty. Also allow "params":null even though it's // // "params" is optional and may be empty. Also allow "params":null even though it's
// not in the spec because our own client used to send it. // // not in the spec because our own client used to send it.
case err != nil: // case err != nil:
return nil, err // return nil, err
case tok == json.Delim('['): // case tok == json.Delim('['):
// Read argument array. // // Read argument array.
if args, err = parseArgumentArray(dec, types); err != nil { // if args, err = parseArgumentArray(dec, types); err != nil {
return nil, err // return nil, err
} // }
default: // default:
return nil, errors.New("non-array args") // return nil, errors.New("non-array args")
} // }
// Set any missing args to nil. // // Set any missing args to nil.
for i := len(args); i < len(types); i++ { // for i := len(args); i < len(types); i++ {
if types[i].Kind() != reflect.Ptr { // if types[i].Kind() != reflect.Ptr {
return nil, fmt.Errorf("missing value for required argument %d", i) // return nil, fmt.Errorf("missing value for required argument %d", i)
} // }
args = append(args, reflect.Zero(types[i])) // args = append(args, reflect.Zero(types[i]))
} // }
return args, nil // return args, nil
} // }
func parseArgumentArray(dec *json.Decoder, types []reflect.Type) ([]reflect.Value, error) { // func parseArgumentArray(dec *json.Decoder, types []reflect.Type) ([]reflect.Value, error) {
args := make([]reflect.Value, 0, len(types)) // args := make([]reflect.Value, 0, len(types))
for i := 0; dec.More(); i++ { // for i := 0; dec.More(); i++ {
if i >= len(types) { // if i >= len(types) {
return args, fmt.Errorf("too many arguments, want at most %d", len(types)) // return args, fmt.Errorf("too many arguments, want at most %d", len(types))
} // }
argval := reflect.New(types[i]) // argval := reflect.New(types[i])
if err := dec.Decode(argval.Interface()); err != nil { // if err := dec.Decode(argval.Interface()); err != nil {
return args, fmt.Errorf("invalid argument %d: %v", i, err) // return args, fmt.Errorf("invalid argument %d: %v", i, err)
} // }
if argval.IsNil() && types[i].Kind() != reflect.Ptr { // if argval.IsNil() && types[i].Kind() != reflect.Ptr {
return args, fmt.Errorf("missing value for required argument %d", i) // return args, fmt.Errorf("missing value for required argument %d", i)
} // }
args = append(args, argval.Elem()) // args = append(args, argval.Elem())
} // }
// Read end of args array. // // Read end of args array.
_, err := dec.Token() // _, err := dec.Token()
return args, err // return args, err
} // }
// parseSubscriptionName extracts the subscription name from an encoded argument array. // // parseSubscriptionName extracts the subscription name from an encoded argument array.
func parseSubscriptionName(rawArgs json.RawMessage) (string, error) { // func parseSubscriptionName(rawArgs json.RawMessage) (string, error) {
dec := json.NewDecoder(bytes.NewReader(rawArgs)) // dec := json.NewDecoder(bytes.NewReader(rawArgs))
if tok, _ := dec.Token(); tok != json.Delim('[') { // if tok, _ := dec.Token(); tok != json.Delim('[') {
return "", errors.New("non-array args") // return "", errors.New("non-array args")
} // }
v, _ := dec.Token() // v, _ := dec.Token()
method, ok := v.(string) // method, ok := v.(string)
if !ok { // if !ok {
return "", errors.New("expected subscription name as first argument") // return "", errors.New("expected subscription name as first argument")
} // }
return method, nil // return method, nil
} // }
package multisend package multisend
import ( // import (
"time" // "time"
"code.wuban.net.cn/multisend/internal/logging" // "code.wuban.net.cn/multisend/internal/logging"
) // )
func ExecuteStandalone(cfg Config) error { // func ExecuteStandalone(cfg Config) error {
logger := logging.NewLogrusLogger("loadtest") // logger := logging.NewLogrusLogger("loadtest")
if cfg.ExpectPeers > 0 { // if cfg.ExpectPeers > 0 {
peers, err := waitForEthNetworkPeers( // peers, err := waitForEthNetworkPeers(
cfg.Endpoints, // cfg.Endpoints,
cfg.EndpointSelectMethod, // cfg.EndpointSelectMethod,
cfg.ExpectPeers, // cfg.ExpectPeers,
cfg.MinConnectivity, // cfg.MinConnectivity,
cfg.MaxEndpoints, // cfg.MaxEndpoints,
time.Duration(cfg.PeerConnectTimeout)*time.Second, // time.Duration(cfg.PeerConnectTimeout)*time.Second,
logger, // logger,
) // )
if err != nil { // if err != nil {
logger.Error("Failed while waiting for peers to connect", "err", err) // logger.Error("Failed while waiting for peers to connect", "err", err)
return err // return err
} // }
cfg.Endpoints = peers // cfg.Endpoints = peers
} // }
logger.Info("Connecting to remote endpoints") // logger.Info("Connecting to remote endpoints")
tg := NewTransactorGroup() // tg := NewTransactorGroup()
if err := tg.AddAll(&cfg); err != nil { // if err := tg.AddAll(&cfg); err != nil {
return err // return err
} // }
logger.Info("Initiating load test") // logger.Info("Initiating load test")
tg.Start() // tg.Start()
var cancelTrap chan struct{} // var cancelTrap chan struct{}
if !cfg.NoTrapInterrupts { // if !cfg.NoTrapInterrupts {
// we want to know if the user hits Ctrl+Break // // we want to know if the user hits Ctrl+Break
cancelTrap = trapInterrupts(func() { tg.Cancel() }, logger) // cancelTrap = trapInterrupts(func() { tg.Cancel() }, logger)
defer close(cancelTrap) // defer close(cancelTrap)
} else { // } else {
logger.Debug("Skipping trapping of interrupts (e.g. Ctrl+Break)") // logger.Debug("Skipping trapping of interrupts (e.g. Ctrl+Break)")
} // }
if err := tg.Wait(); err != nil { // if err := tg.Wait(); err != nil {
logger.Error("Failed to execute load test", "err", err) // logger.Error("Failed to execute load test", "err", err)
return err // return err
} // }
logger.Info("Load test complete!") // logger.Info("Load test complete!")
return nil // return nil
} // }
...@@ -13,16 +13,11 @@ import ( ...@@ -13,16 +13,11 @@ import (
"code.wuban.net.cn/multisend/internal/logging" "code.wuban.net.cn/multisend/internal/logging"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
//rpctypes "github.com/tendermint/tendermint/rpc/jsonrpc/types"
) )
const ( const (
connSendTimeout = 10 * time.Second connSendTimeout = 10 * time.Second
// see https://github.com/tendermint/tendermint/blob/master/rpc/lib/server/handlers.go
connPingPeriod = (30 * 9 / 10) * time.Second connPingPeriod = (30 * 9 / 10) * time.Second
//jsonRPCID = rpctypes.JSONRPCStringID("tm-load-test")
defaultProgressCallbackInterval = 5 * time.Second defaultProgressCallbackInterval = 5 * time.Second
) )
...@@ -90,7 +85,7 @@ func NewTransactor(remoteAddr string, config *Config) (*Transactor, error) { ...@@ -90,7 +85,7 @@ func NewTransactor(remoteAddr string, config *Config) (*Transactor, error) {
client: client, client: client,
logger: logger, logger: logger,
conn: conn, conn: conn,
broadcastTxMethod: "broadcast_tx_" + config.BroadcastTxMethod, //broadcastTxMethod: "broadcast_tx_" + config.BroadcastTxMethod,
progressCallbackInterval: defaultProgressCallbackInterval, progressCallbackInterval: defaultProgressCallbackInterval,
}, nil }, nil
} }
...@@ -178,7 +173,6 @@ func (t *Transactor) sendLoop() { ...@@ -178,7 +173,6 @@ func (t *Transactor) sendLoop() {
} }
return err return err
}) })
pingTicker := time.NewTicker(connPingPeriod) pingTicker := time.NewTicker(connPingPeriod)
timeLimitTicker := time.NewTicker(time.Duration(t.config.Time) * time.Second) timeLimitTicker := time.NewTicker(time.Duration(t.config.Time) * time.Second)
sendTicker := time.NewTicker(time.Duration(t.config.SendPeriod) * time.Second) sendTicker := time.NewTicker(time.Duration(t.config.SendPeriod) * time.Second)
...@@ -223,12 +217,6 @@ func (t *Transactor) sendLoop() { ...@@ -223,12 +217,6 @@ func (t *Transactor) sendLoop() {
} }
func (t *Transactor) writeTx(msg interface{}) error { func (t *Transactor) writeTx(msg interface{}) error {
// txBase64 := base64.StdEncoding.EncodeToString(tx)
// paramsJSON, err := json.Marshal(map[string]interface{}{"tx": txBase64})
// if err != nil {
// return err
// }
// _ = t.conn.SetWriteDeadline(time.Now().Add(connSendTimeout))
err := t.conn.WriteJSON(msg) err := t.conn.WriteJSON(msg)
return err return err
} }
...@@ -248,13 +236,6 @@ func (t *Transactor) setStop(err error) { ...@@ -248,13 +236,6 @@ func (t *Transactor) setStop(err error) {
t.stopMtx.Unlock() t.stopMtx.Unlock()
} }
// type requestOp struct {
// ids []json.RawMessage
// err error
// resp chan *jsonrpcMessage // receives up to len(ids) responses
// sub *ClientSubscription // only set for EthSubscribe requests
// }
func (t *Transactor) sendTransactions() error { func (t *Transactor) sendTransactions() error {
// send as many transactions as we can, up to the send rate // send as many transactions as we can, up to the send rate
totalSent := t.GetTxCount() totalSent := t.GetTxCount()
...@@ -285,11 +266,10 @@ func (t *Transactor) sendTransactions() error { ...@@ -285,11 +266,10 @@ func (t *Transactor) sendTransactions() error {
args := hexutil.Encode(data) args := hexutil.Encode(data)
method := "eth_sendRawTransaction" method := "eth_sendRawTransaction"
msg, err := t.newMessage(method, args...) msg, err := t.newMessage(method, args)
if err != nil { if err != nil {
return err return err
} }
//op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)} //op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)}
//return ec.c.CallContext(ctx, nil, "eth_sendRawTransaction", hexutil.Encode(data)) //return ec.c.CallContext(ctx, nil, "eth_sendRawTransaction", hexutil.Encode(data))
if err := t.writeTx(msg); err != nil { if err := t.writeTx(msg); err != nil {
......
...@@ -2,8 +2,29 @@ package multisend ...@@ -2,8 +2,29 @@ package multisend
import ( import (
"testing" "testing"
"time"
) )
func TestTransactor(t *testing.T) { func TestTransactor(t *testing.T) {
cfg := Config{
Rate: 10,
Count: 10,
Connections: 1,
Time: 60,
SendPeriod: 10,
ClientFactory: "ethclient",
}
transactor, err := NewTransactor("ws://13.40.31.153:8546", &cfg)
if err != nil {
t.Error(err)
}
//transactor.sendTransactions()
//transactor.sendLoop()
transactor.Start()
time.Sleep(time.Second * 60)
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment