Commit fbbba6ed authored by 李伟@五瓣科技's avatar 李伟@五瓣科技

encode with proto buffer

parent ae2ca46c
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.27.1
// protoc v3.11.2
// source: batchtx.proto
package multisend
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type Tx struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
From []byte `protobuf:"bytes,1,opt,name=from,proto3" json:"from,omitempty"`
To []byte `protobuf:"bytes,2,opt,name=to,proto3" json:"to,omitempty"`
Amount int64 `protobuf:"varint,3,opt,name=amount,proto3" json:"amount,omitempty"`
}
func (x *Tx) Reset() {
*x = Tx{}
if protoimpl.UnsafeEnabled {
mi := &file_batchtx_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *Tx) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Tx) ProtoMessage() {}
func (x *Tx) ProtoReflect() protoreflect.Message {
mi := &file_batchtx_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Tx.ProtoReflect.Descriptor instead.
func (*Tx) Descriptor() ([]byte, []int) {
return file_batchtx_proto_rawDescGZIP(), []int{0}
}
func (x *Tx) GetFrom() []byte {
if x != nil {
return x.From
}
return nil
}
func (x *Tx) GetTo() []byte {
if x != nil {
return x.To
}
return nil
}
func (x *Tx) GetAmount() int64 {
if x != nil {
return x.Amount
}
return 0
}
type BatchTx struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Hash []byte `protobuf:"bytes,1,opt,name=hash,proto3" json:"hash,omitempty"`
Txs []*Tx `protobuf:"bytes,2,rep,name=txs,proto3" json:"txs,omitempty"`
}
func (x *BatchTx) Reset() {
*x = BatchTx{}
if protoimpl.UnsafeEnabled {
mi := &file_batchtx_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *BatchTx) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*BatchTx) ProtoMessage() {}
func (x *BatchTx) ProtoReflect() protoreflect.Message {
mi := &file_batchtx_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use BatchTx.ProtoReflect.Descriptor instead.
func (*BatchTx) Descriptor() ([]byte, []int) {
return file_batchtx_proto_rawDescGZIP(), []int{1}
}
func (x *BatchTx) GetHash() []byte {
if x != nil {
return x.Hash
}
return nil
}
func (x *BatchTx) GetTxs() []*Tx {
if x != nil {
return x.Txs
}
return nil
}
var File_batchtx_proto protoreflect.FileDescriptor
var file_batchtx_proto_rawDesc = []byte{
0x0a, 0x0d, 0x62, 0x61, 0x74, 0x63, 0x68, 0x74, 0x78, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12,
0x09, 0x6d, 0x75, 0x6c, 0x74, 0x69, 0x73, 0x65, 0x6e, 0x64, 0x22, 0x40, 0x0a, 0x02, 0x54, 0x78,
0x12, 0x12, 0x0a, 0x04, 0x66, 0x72, 0x6f, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04,
0x66, 0x72, 0x6f, 0x6d, 0x12, 0x0e, 0x0a, 0x02, 0x74, 0x6f, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c,
0x52, 0x02, 0x74, 0x6f, 0x12, 0x16, 0x0a, 0x06, 0x61, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x03,
0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x61, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x3e, 0x0a, 0x07,
0x42, 0x61, 0x74, 0x63, 0x68, 0x54, 0x78, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x61, 0x73, 0x68, 0x18,
0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x68, 0x61, 0x73, 0x68, 0x12, 0x1f, 0x0a, 0x03, 0x74,
0x78, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x6d, 0x75, 0x6c, 0x74, 0x69,
0x73, 0x65, 0x6e, 0x64, 0x2e, 0x54, 0x78, 0x52, 0x03, 0x74, 0x78, 0x73, 0x42, 0x04, 0x5a, 0x02,
0x2e, 0x2f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_batchtx_proto_rawDescOnce sync.Once
file_batchtx_proto_rawDescData = file_batchtx_proto_rawDesc
)
func file_batchtx_proto_rawDescGZIP() []byte {
file_batchtx_proto_rawDescOnce.Do(func() {
file_batchtx_proto_rawDescData = protoimpl.X.CompressGZIP(file_batchtx_proto_rawDescData)
})
return file_batchtx_proto_rawDescData
}
var file_batchtx_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_batchtx_proto_goTypes = []interface{}{
(*Tx)(nil), // 0: multisend.Tx
(*BatchTx)(nil), // 1: multisend.BatchTx
}
var file_batchtx_proto_depIdxs = []int32{
0, // 0: multisend.BatchTx.txs:type_name -> multisend.Tx
1, // [1:1] is the sub-list for method output_type
1, // [1:1] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
1, // [1:1] is the sub-list for extension extendee
0, // [0:1] is the sub-list for field type_name
}
func init() { file_batchtx_proto_init() }
func file_batchtx_proto_init() {
if File_batchtx_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_batchtx_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Tx); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_batchtx_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*BatchTx); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_batchtx_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_batchtx_proto_goTypes,
DependencyIndexes: file_batchtx_proto_depIdxs,
MessageInfos: file_batchtx_proto_msgTypes,
}.Build()
File_batchtx_proto = out.File
file_batchtx_proto_rawDesc = nil
file_batchtx_proto_goTypes = nil
file_batchtx_proto_depIdxs = nil
}
syntax = "proto3";
package multisend;
option go_package = "./";
message Tx{
bytes from = 1;
bytes to = 2;
int64 amount = 3;
}
message BatchTx{
bytes hash = 1;
repeated Tx txs = 2;
}
// message Person {
// string name = 1;
// int32 id = 2; // Unique ID number for this person.
// string email = 3;
// enum PhoneType {
// MOBILE = 0;
// HOME = 1;
// WORK = 2;
// }
// message PhoneNumber {
// string number = 1;
// PhoneType type = 2;
// }
// repeated PhoneNumber phones = 4;
// google.protobuf.Timestamp last_updated = 5;
// }
// // Our address book file is just one of these.
// message AddressBook {
// repeated Person people = 1;
// }
\ No newline at end of file
......@@ -13,6 +13,7 @@ require (
github.com/go-ole/go-ole v1.2.1 // indirect
github.com/go-redis/redis/v8 v8.11.4 // indirect
github.com/go-stack/stack v1.8.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
......@@ -20,5 +21,6 @@ require (
github.com/tklauser/numcpus v0.2.2 // indirect
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 // indirect
golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
)
......@@ -160,6 +160,7 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
......@@ -565,6 +566,8 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
......
......@@ -9,20 +9,19 @@ import (
"crypto/sha256"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
// "github.com/cbergoon/merkletree"
//"github.com/cbergoon/merkletree"
)
var originalTxParam EthClient
var originTxPrivateKey string = "9e0944f587e1043d6e303644738b0c7c77ed15b176ca574ed0be40c0b9bbdc3a"
var originalTxsHashQueue chan *[]byte = make(chan *[]byte, 1000)
var originalTxsWithFromQueue chan *TxsHash = make(chan *TxsHash, 2000000)
var batchTxsForRedis chan *BatchTx = make(chan *BatchTx, batchTxHashSize*batchTxHashQueueSize)
const batchSize = 100
const hashRootSize = 200
const batchTxSize = 5000
const batchTxHashSize = 100
const batchTxHashQueueSize = 10
func init() {
......@@ -45,48 +44,28 @@ func init() {
}
type TxsHash struct {
Hash []byte
Txs []TxWithFrom
}
type TxWithFrom struct {
From common.Address `json:md5`
Tx *types.Transaction `json:tx`
}
// //CalculateHash hashes the values of a TestContent
// func (t TxWithFrom) CalculateHash() ([]byte, error) {
// hash := t.Tx.Hash()
// return hash[:], nil
// }
// //Equals tests for equality of two Contents
// func (t TxWithFrom) Equals(other merkletree.Content) (bool, error) {
// return t.Tx.Hash().Big().Cmp(other.(TxWithFrom).Tx.Hash().Big()) == 0, nil
// }
func ProduceOriginalTx() error {
for {
//fmt.Printf("len(originalTxQueue): %d len(originalMd5TxQueue): %d \n", len(originalTxQueue), len(originalMd5TxQueue))
if len(originalTxsHashQueue) < 200 {
if len(originalTxsHashQueue) < batchTxHashSize*batchTxHashQueueSize {
var hashesBytes []byte = make([]byte, 0, hashRootSize)
var hashesBytes []byte = make([]byte, 0, 32*batchTxHashSize)
for j := 0; j < hashRootSize; j++ {
for j := 0; j < batchTxHashSize; j++ {
var txsBytes []byte
var txsWithFrom []TxWithFrom = make([]TxWithFrom, 0, batchSize)
for i := 0; i < batchSize; i++ {
var txsWithFrom []*Tx = make([]*Tx, 0, batchTxSize)
for i := 0; i < batchTxSize; i++ {
tx, err := buildOriginalTx(originalTxParam.Nonce, toAddress, big.NewInt(256), nil)
if err != nil {
return err
}
txsWithFrom = append(txsWithFrom, TxWithFrom{
From: originalTxParam.FromAddr,
Tx: tx,
txsWithFrom = append(txsWithFrom, &Tx{
From: originalTxParam.FromAddr.Bytes(),
To: tx.To().Bytes(),
Amount: tx.Value().Int64(),
})
originalTxParam.Nonce += 1
......@@ -106,16 +85,15 @@ func ProduceOriginalTx() error {
hashBytes := h.Sum(nil)
hashesBytes = append(hashesBytes, hashBytes...)
txs := TxsHash{hashBytes, txsWithFrom}
originalTxsWithFromQueue <- &txs
txs := BatchTx{Hash: hashBytes, Txs: txsWithFrom}
batchTxsForRedis <- &txs
}
originalTxsHashQueue <- &hashesBytes
} else {
return nil
time.Sleep(time.Millisecond * 1)
//return nil
time.Sleep(time.Hour * 1)
}
}
}
// var ctx = context.Background()
......
package multisend
import (
"fmt"
sync "sync"
"testing"
"time"
)
func TestProduceTx(t *testing.T) {
//StartProduceTx()
ProduceOriginalTx()
//ProduceOriginalTx()
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
if err := ProduceOriginalTx(); err != nil {
fmt.Printf("ProduceOriginalTx stop err: %s\n", err.Error())
}
}()
for {
if len(batchTxsForRedis) >= batchTxHashSize*batchTxHashQueueSize {
break
}
t.Logf("waiting for produce original tx, len(originalTxsForRedis):%d \n", len(batchTxsForRedis))
time.Sleep(2 * time.Second)
}
go func() {
defer wg.Done()
Start()
}()
wg.Wait()
}
......@@ -2,11 +2,12 @@ package multisend
import (
"context"
"encoding/json"
"fmt"
"runtime"
"time"
"github.com/golang/protobuf/proto"
"github.com/go-redis/redis/v8"
)
......@@ -16,6 +17,7 @@ var (
type Job struct {
Client *redis.Client
Id int
}
func initClient(poolSize int) *redis.Client {
......@@ -44,19 +46,23 @@ func Start() {
//defer client.Close()
//定义每个任务执行的方法
jobfunc := func(client *redis.Client) error {
jobfunc := func(client *redis.Client, id int) error {
count := 0
for {
select {
case txs := <-originalTxsWithFromQueue:
txsAsJson, err := json.Marshal(txs)
case txs := <-batchTxsForRedis:
startTime := time.Now()
data, err := proto.Marshal(txs)
if err != nil {
return err
}
if err := client.LPush(context.Background(), "list", txsAsJson).Err(); err != nil {
fmt.Println(err.Error())
if err := client.LPush(context.Background(), "list", data).Err(); err != nil {
return err
}
count += 1
fmt.Printf("id: %d send %d txs size: %d takes %v \n", id, count, len(data), time.Since(startTime))
}
}
}
......@@ -64,7 +70,7 @@ func Start() {
//1 添加 job 到 channel
go func() {
for index := 0; index < jobnum; index++ {
jobs <- Job{client}
jobs <- Job{client, index}
}
defer close(jobs)
}()
......@@ -72,7 +78,7 @@ func Start() {
//2 并行执行 jobs
for j := range jobs {
go func(job Job) {
if err := jobfunc(client); err != nil {
if err := jobfunc(client, j.Id); err != nil {
panic(err.Error())
}
}(j)
......
......@@ -10,7 +10,7 @@ func TestTransactor(t *testing.T) {
go StartProduceTx()
for {
if len(originalTxsHashQueue) >= 100 {
if len(originalTxsHashQueue) >= 10 {
break
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment