Commit 521b6f8d authored by duanjinfei's avatar duanjinfei

batch broadcastTx test

parent f1fb7079
...@@ -134,7 +134,6 @@ func (m *Sha3Request) GetData() []byte { ...@@ -134,7 +134,6 @@ func (m *Sha3Request) GetData() []byte {
} }
type Sha3Response struct { type Sha3Response struct {
//
Hash *github_com_CaduceusMetaverseProtocol_MetaTypes_types.Hash `protobuf:"bytes,1,opt,name=hash,proto3,customtype=github.com/CaduceusMetaverseProtocol/MetaTypes/types.Hash" json:"hash,omitempty"` Hash *github_com_CaduceusMetaverseProtocol_MetaTypes_types.Hash `protobuf:"bytes,1,opt,name=hash,proto3,customtype=github.com/CaduceusMetaverseProtocol/MetaTypes/types.Hash" json:"hash,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
......
...@@ -24,7 +24,8 @@ const _ = grpc.SupportPackageIsVersion7 ...@@ -24,7 +24,8 @@ const _ = grpc.SupportPackageIsVersion7
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type RingServiceClient interface { type RingServiceClient interface {
Ping(ctx context.Context, in *RequestPing, opts ...grpc.CallOption) (*ResponsePing, error) Ping(ctx context.Context, in *RequestPing, opts ...grpc.CallOption) (*ResponsePing, error)
//rpc BroadcastTx(RequestBroadcastTx) returns (ResponseBroadcastTx); // rpc BroadcastTx(RequestBroadcastTx) returns (ResponseBroadcastTx);
BroadcastTxs(ctx context.Context, in *BroadcastEthTxWithFromRequests, opts ...grpc.CallOption) (*ResponseBroadcastTxs, error)
BroadcastTx(ctx context.Context, in *BroadcastEthTxWithFromRequest, opts ...grpc.CallOption) (*ResponseBroadcastTx, error) BroadcastTx(ctx context.Context, in *BroadcastEthTxWithFromRequest, opts ...grpc.CallOption) (*ResponseBroadcastTx, error)
// web3 // web3
Sha3(ctx context.Context, in *Sha3Request, opts ...grpc.CallOption) (*Sha3Response, error) Sha3(ctx context.Context, in *Sha3Request, opts ...grpc.CallOption) (*Sha3Response, error)
...@@ -101,6 +102,15 @@ func (c *ringServiceClient) Ping(ctx context.Context, in *RequestPing, opts ...g ...@@ -101,6 +102,15 @@ func (c *ringServiceClient) Ping(ctx context.Context, in *RequestPing, opts ...g
return out, nil return out, nil
} }
func (c *ringServiceClient) BroadcastTxs(ctx context.Context, in *BroadcastEthTxWithFromRequests, opts ...grpc.CallOption) (*ResponseBroadcastTxs, error) {
out := new(ResponseBroadcastTxs)
err := c.cc.Invoke(ctx, "/ring.v1.RingService/BroadcastTxs", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *ringServiceClient) BroadcastTx(ctx context.Context, in *BroadcastEthTxWithFromRequest, opts ...grpc.CallOption) (*ResponseBroadcastTx, error) { func (c *ringServiceClient) BroadcastTx(ctx context.Context, in *BroadcastEthTxWithFromRequest, opts ...grpc.CallOption) (*ResponseBroadcastTx, error) {
out := new(ResponseBroadcastTx) out := new(ResponseBroadcastTx)
err := c.cc.Invoke(ctx, "/ring.v1.RingService/BroadcastTx", in, out, opts...) err := c.cc.Invoke(ctx, "/ring.v1.RingService/BroadcastTx", in, out, opts...)
...@@ -538,7 +548,8 @@ func (c *ringServiceClient) Logs(ctx context.Context, in *LogsRequest, opts ...g ...@@ -538,7 +548,8 @@ func (c *ringServiceClient) Logs(ctx context.Context, in *LogsRequest, opts ...g
// for forward compatibility // for forward compatibility
type RingServiceServer interface { type RingServiceServer interface {
Ping(context.Context, *RequestPing) (*ResponsePing, error) Ping(context.Context, *RequestPing) (*ResponsePing, error)
//rpc BroadcastTx(RequestBroadcastTx) returns (ResponseBroadcastTx); // rpc BroadcastTx(RequestBroadcastTx) returns (ResponseBroadcastTx);
BroadcastTxs(context.Context, *BroadcastEthTxWithFromRequests) (*ResponseBroadcastTxs, error)
BroadcastTx(context.Context, *BroadcastEthTxWithFromRequest) (*ResponseBroadcastTx, error) BroadcastTx(context.Context, *BroadcastEthTxWithFromRequest) (*ResponseBroadcastTx, error)
// web3 // web3
Sha3(context.Context, *Sha3Request) (*Sha3Response, error) Sha3(context.Context, *Sha3Request) (*Sha3Response, error)
...@@ -606,6 +617,9 @@ type UnimplementedRingServiceServer struct { ...@@ -606,6 +617,9 @@ type UnimplementedRingServiceServer struct {
func (UnimplementedRingServiceServer) Ping(context.Context, *RequestPing) (*ResponsePing, error) { func (UnimplementedRingServiceServer) Ping(context.Context, *RequestPing) (*ResponsePing, error) {
return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented") return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented")
} }
func (UnimplementedRingServiceServer) BroadcastTxs(context.Context, *BroadcastEthTxWithFromRequests) (*ResponseBroadcastTxs, error) {
return nil, status.Errorf(codes.Unimplemented, "method BroadcastTxs not implemented")
}
func (UnimplementedRingServiceServer) BroadcastTx(context.Context, *BroadcastEthTxWithFromRequest) (*ResponseBroadcastTx, error) { func (UnimplementedRingServiceServer) BroadcastTx(context.Context, *BroadcastEthTxWithFromRequest) (*ResponseBroadcastTx, error) {
return nil, status.Errorf(codes.Unimplemented, "method BroadcastTx not implemented") return nil, status.Errorf(codes.Unimplemented, "method BroadcastTx not implemented")
} }
...@@ -781,6 +795,24 @@ func _RingService_Ping_Handler(srv interface{}, ctx context.Context, dec func(in ...@@ -781,6 +795,24 @@ func _RingService_Ping_Handler(srv interface{}, ctx context.Context, dec func(in
return interceptor(ctx, in, info, handler) return interceptor(ctx, in, info, handler)
} }
func _RingService_BroadcastTxs_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(BroadcastEthTxWithFromRequests)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(RingServiceServer).BroadcastTxs(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/ring.v1.RingService/BroadcastTxs",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RingServiceServer).BroadcastTxs(ctx, req.(*BroadcastEthTxWithFromRequests))
}
return interceptor(ctx, in, info, handler)
}
func _RingService_BroadcastTx_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { func _RingService_BroadcastTx_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(BroadcastEthTxWithFromRequest) in := new(BroadcastEthTxWithFromRequest)
if err := dec(in); err != nil { if err := dec(in); err != nil {
...@@ -1656,6 +1688,10 @@ var RingService_ServiceDesc = grpc.ServiceDesc{ ...@@ -1656,6 +1688,10 @@ var RingService_ServiceDesc = grpc.ServiceDesc{
MethodName: "Ping", MethodName: "Ping",
Handler: _RingService_Ping_Handler, Handler: _RingService_Ping_Handler,
}, },
{
MethodName: "BroadcastTxs",
Handler: _RingService_BroadcastTxs_Handler,
},
{ {
MethodName: "BroadcastTx", MethodName: "BroadcastTx",
Handler: _RingService_BroadcastTx_Handler, Handler: _RingService_BroadcastTx_Handler,
......
...@@ -9,10 +9,10 @@ import ( ...@@ -9,10 +9,10 @@ import (
) )
var ( var (
initAcc, initCmp, batchSign, batchRecover, batchRecoverTx, batchVerify bool initAcc, initCmp, broadcastTxArr bool
txCount, goRoutineCount, cmpAmount, batchCount int txCount, goRoutineCount, cmpAmount, batchCount, startCount, endCount int
cpuProfile string cpuProfile string
cfg *tool.Config cfg *tool.Config
) )
func init() { func init() {
...@@ -20,12 +20,11 @@ func init() { ...@@ -20,12 +20,11 @@ func init() {
initCmd.PersistentFlags().BoolVar(&initAcc, "initAcc", false, "Start after initializing the account") initCmd.PersistentFlags().BoolVar(&initAcc, "initAcc", false, "Start after initializing the account")
initCmd.PersistentFlags().BoolVar(&initCmp, "initCmp", false, "Start after initializing the account cmp balance") initCmd.PersistentFlags().BoolVar(&initCmp, "initCmp", false, "Start after initializing the account cmp balance")
startCmd.PersistentFlags().StringVar(&cpuProfile, "cpuProfile", "cpuProfile.prof", "Statistics cpu profile") startCmd.PersistentFlags().StringVar(&cpuProfile, "cpuProfile", "cpuProfile.prof", "Statistics cpu profile")
startCmd.PersistentFlags().BoolVar(&batchSign, "batchSign", false, "test grpc interface -> batchSign") startCmd.PersistentFlags().IntVar(&startCount, "startCount", 0, "read excel start count")
startCmd.PersistentFlags().BoolVar(&batchRecover, "batchRecover", false, "test grpc interface -> batchRecover") startCmd.PersistentFlags().IntVar(&endCount, "endCount", 100, "read excel end count")
startCmd.PersistentFlags().BoolVar(&batchRecoverTx, "batchRecoverTx", false, "test grpc interface -> batchRecoverTx") startCmd.PersistentFlags().BoolVar(&broadcastTxArr, "broadcastTxArr", false, "test grpc interface -> broadcastTxArr")
startCmd.PersistentFlags().BoolVar(&batchVerify, "batchVerify", false, "test grpc interface -> batchVerify")
startCmd.PersistentFlags().IntVar(&txCount, "txCount", 1000, "send tran count") startCmd.PersistentFlags().IntVar(&txCount, "txCount", 1000, "send tran count")
startCmd.PersistentFlags().IntVar(&goRoutineCount, "goRoutineCount", 100, "send tran goRoutine count") startCmd.PersistentFlags().IntVar(&goRoutineCount, "goRoutineCount", 10, "send tran goRoutine count")
startCmd.PersistentFlags().IntVar(&batchCount, "batchCount", 100, "batch send tran count") startCmd.PersistentFlags().IntVar(&batchCount, "batchCount", 100, "batch send tran count")
startCmd.AddCommand(initCmd) startCmd.AddCommand(initCmd)
} }
......
...@@ -18,35 +18,17 @@ func init() { ...@@ -18,35 +18,17 @@ func init() {
return return
} }
cfg.StorageAccFileName += ".xlsx" cfg.StorageAccFileName += ".xlsx"
SendTxAccountArr = tool.ReadExcel(cfg.Count, cfg.StorageAccFileName)
} }
func startTest() { func startTest() {
SendTxAccountArr = tool.ReadExcelOfStartEnd(startCount, endCount, cfg.StorageAccFileName)
log.Infof("Program start initAccCount:%d tranCount", cfg.Count) log.Infof("Program start initAccCount:%d tranCount", cfg.Count)
cfg.GoRoutineCount = goRoutineCount cfg.GoRoutineCount = goRoutineCount
cfg.SignCount = txCount / 100 cfg.SignCount = txCount / 100
cfg.BatchCount = batchCount cfg.BatchCount = batchCount
arr, fromAddrArr := transaction.SignedTxArr(SendTxAccountArr, cfg) arr := transaction.SignedTxArr(SendTxAccountArr, cfg)
if batchSign { if broadcastTxArr {
if err := transaction.BatchSignHandler(arr, cfg); err != nil { if err := transaction.BroadcastTxArr(arr, cfg); err != nil {
log.Errorf("Bath Send Tran error: %s", err)
return
}
}
if batchRecover {
if err := transaction.BatchRecoverHandler(arr, cfg); err != nil {
log.Errorf("Bath Send Tran error: %s", err)
return
}
}
if batchRecoverTx {
if err := transaction.BatchRecoverTxHandler(arr, fromAddrArr, cfg); err != nil {
log.Errorf("Bath Send Tran error: %s", err)
return
}
}
if batchVerify {
if err := transaction.BatchVerifyHandler(arr, cfg); err != nil {
log.Errorf("Bath Send Tran error: %s", err) log.Errorf("Bath Send Tran error: %s", err)
return return
} }
......
...@@ -141,7 +141,7 @@ func ReadExcelOfStartEnd(start int, end int, fileName string) [][]string { ...@@ -141,7 +141,7 @@ func ReadExcelOfStartEnd(start int, end int, fileName string) [][]string {
return nil return nil
} }
res := make([][]string, 0) res := make([][]string, 0)
for i := start; i <= end; i++ { for i := start; i < end; i++ {
res = append(res, rows[i]) res = append(res, rows[i])
} }
return res return res
......
This diff is collapsed.
...@@ -4,12 +4,17 @@ import ( ...@@ -4,12 +4,17 @@ import (
"ChainGrpcTest/log" "ChainGrpcTest/log"
"ChainGrpcTest/tool" "ChainGrpcTest/tool"
"ChainGrpcTest/txcache" "ChainGrpcTest/txcache"
"context"
"crypto/ecdsa" "crypto/ecdsa"
ring "github.com/CaduceusMetaverseProtocol/MetaProtocol/gen/proto/go/ring/v1"
metatypes "github.com/CaduceusMetaverseProtocol/MetaTypes/types"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"math/big" "math/big"
"sync" "sync/atomic"
"time" "time"
) )
...@@ -28,6 +33,12 @@ type Transactor struct { ...@@ -28,6 +33,12 @@ type Transactor struct {
receivedAddr common.Address receivedAddr common.Address
} }
var (
tran = make(chan *Transactor, 0)
tranArr = make([]*types.Transaction, 0)
batchSignCount int32
)
func newTransactor(cfg TranConfig) (*Transactor, error) { func newTransactor(cfg TranConfig) (*Transactor, error) {
signerKey, err := crypto.HexToECDSA(cfg.PrivateKey) signerKey, err := crypto.HexToECDSA(cfg.PrivateKey)
if err != nil { if err != nil {
...@@ -50,17 +61,29 @@ func newTransactor(cfg TranConfig) (*Transactor, error) { ...@@ -50,17 +61,29 @@ func newTransactor(cfg TranConfig) (*Transactor, error) {
return &res, nil return &res, nil
} }
var accountsNonceMap sync.Map
// SignedTxArr 获取全部签名数据 // SignedTxArr 获取全部签名数据
func SignedTxArr(sendTxAccountArr [][]string, cfg *tool.Config) ([]*types.Transaction, []common.Address) { func SignedTxArr(sendTxAccountArr [][]string, cfg *tool.Config) []*types.Transaction {
tranArr := make([]*types.Transaction, 0) for i := 0; i < cfg.GoRoutineCount; i++ {
fromAddrArr := make([]common.Address, 0) go signedTxFunc()
var signedTx *types.Transaction }
client, err := grpc.Dial(cfg.RpcNode, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Error("grpc dial error:", err)
return nil
}
defer client.Close()
serviceClient := ring.NewRingServiceClient(client)
for _, rows := range sendTxAccountArr { for _, rows := range sendTxAccountArr {
fromAddrArr = append(fromAddrArr, common.HexToAddress(rows[0]))
privateKey := rows[1] privateKey := rows[1]
nonce := big.NewInt(1) fromAddr := metatypes.HexToAddress(rows[0])
nonceReq := &ring.NonceRequest{
Address: &fromAddr,
}
response, err := serviceClient.Nonce(context.Background(), nonceReq, nil)
if err != nil {
return nil
}
nonce := new(big.Int).SetUint64(response.Nonce)
for signCount := 0; signCount < cfg.SignCount; signCount++ { for signCount := 0; signCount < cfg.SignCount; signCount++ {
tranCfg := TranConfig{ tranCfg := TranConfig{
Amount: cfg.Amount, Amount: cfg.Amount,
...@@ -71,35 +94,44 @@ func SignedTxArr(sendTxAccountArr [][]string, cfg *tool.Config) ([]*types.Transa ...@@ -71,35 +94,44 @@ func SignedTxArr(sendTxAccountArr [][]string, cfg *tool.Config) ([]*types.Transa
Nonce: nonce, Nonce: nonce,
} }
t, err := newTransactor(tranCfg) t, err := newTransactor(tranCfg)
signedTx, err = t.signedTx() if err != nil {
txcache.Add(signedTx.Hash().Hex(), rows[0])
nonce = big.NewInt(1).Add(nonce, big.NewInt(1))
if err != nil || signedTx == nil {
log.Errorf("signed tx error %s ", err) log.Errorf("signed tx error %s ", err)
continue continue
} }
tranArr = append(tranArr, signedTx) tran <- t
nonce = big.NewInt(1).Add(nonce, big.NewInt(1))
}
}
for {
if len(sendTxAccountArr)*cfg.SignCount == int(batchSignCount) {
return tranArr
} }
} }
return tranArr, fromAddrArr
} }
// signedTx 签名本币转账交易 // signedTxFunc 签名本币转账交易
func (t *Transactor) signedTx() (*types.Transaction, error) { func signedTxFunc() (*types.Transaction, error) {
txData := types.LegacyTx{ for {
Nonce: t.config.Nonce.Uint64(), select {
To: &t.receivedAddr, case t := <-tran:
Value: big.NewInt(t.config.Amount), txData := types.LegacyTx{
Gas: 300000, Nonce: t.config.Nonce.Uint64(),
GasPrice: big.NewInt(1000000001), To: &t.receivedAddr,
Data: nil, Value: big.NewInt(t.config.Amount),
} Gas: 300000,
newtx := types.NewTx(&txData) GasPrice: big.NewInt(1000000001),
signedTx, err := types.SignTx(newtx, types.NewEIP155Signer(big.NewInt(t.config.ChainId)), t.signerKey) Data: nil,
if err != nil { }
log.Errorf("Send tx nonce: %d , From: %s , to: %s , error: %s", t.config.Nonce, crypto.PubkeyToAddress(t.signerKey.PublicKey), t.receivedAddr, err.Error()) newtx := types.NewTx(&txData)
time.Sleep(time.Second) signedTx, err := types.SignTx(newtx, types.NewEIP155Signer(big.NewInt(t.config.ChainId)), t.signerKey)
return nil, err if err != nil {
log.Errorf("Send tx nonce: %d , From: %s , to: %s , error: %s", t.config.Nonce, crypto.PubkeyToAddress(t.signerKey.PublicKey), t.receivedAddr, err.Error())
time.Sleep(time.Second)
return nil, err
}
txcache.Add(signedTx.Hash().Hex(), t.sender.Hex())
tranArr = append(tranArr, signedTx)
atomic.AddInt32(&batchSignCount, 1)
}
} }
return signedTx, nil
} }
...@@ -6,7 +6,7 @@ import ( ...@@ -6,7 +6,7 @@ import (
) )
var ( var (
gtxcache = &TxCache{ gtxCache = &TxCache{
cache: make(map[string]string), cache: make(map[string]string),
} }
) )
...@@ -16,18 +16,25 @@ type TxCache struct { ...@@ -16,18 +16,25 @@ type TxCache struct {
mux sync.RWMutex mux sync.RWMutex
} }
func Add(txhash string, address string) { func Add(txHash string, address string) {
gtxcache.mux.Lock() gtxCache.mux.Lock()
defer gtxcache.mux.Unlock() defer gtxCache.mux.Unlock()
gtxcache.cache[txhash] = address gtxCache.cache[txHash] = address
} }
func CompareAddress(txhash string, newer string) bool { func CompareAddress(txHash string, newer string) bool {
gtxcache.mux.RLock() gtxCache.mux.RLock()
defer gtxcache.mux.RUnlock() defer gtxCache.mux.RUnlock()
o, exist := gtxcache.cache[txhash] o, exist := gtxCache.cache[txHash]
if !exist { if !exist {
return false return false
} }
return strings.Compare(newer, o) == 0 return strings.Compare(newer, o) == 0
} }
func GetFromAddr(txHash string) (string, bool) {
gtxCache.mux.RLock()
defer gtxCache.mux.RUnlock()
o, exist := gtxCache.cache[txHash]
return o, exist
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment