Commit 23e7c107 authored by 李伟@五瓣科技's avatar 李伟@五瓣科技

record data

parent eb7d1b37
...@@ -691,6 +691,15 @@ func (web *WebServicer) tee( ...@@ -691,6 +691,15 @@ func (web *WebServicer) tee(
} }
type takeResp struct {
RedisTxList []OriginalBatchTxs
ConsenusTxs []*types.Transaction
ConsTxWithBatchs []ConsTxWithBatchHash
BeginOriginalTx common.Hash
EndOriginalTx common.Hash
TxNum int
}
func (web *WebServicer) sendBatchTxsFromQueue(fromAddr string, toAddrs []string, txCount int, amount int64, id uuid.UUID, requestAmount int64) error { func (web *WebServicer) sendBatchTxsFromQueue(fromAddr string, toAddrs []string, txCount int, amount int64, id uuid.UUID, requestAmount int64) error {
done := make(chan interface{}) done := make(chan interface{})
...@@ -734,15 +743,6 @@ func (web *WebServicer) sendBatchTxsFromQueue(fromAddr string, toAddrs []string, ...@@ -734,15 +743,6 @@ func (web *WebServicer) sendBatchTxsFromQueue(fromAddr string, toAddrs []string,
return valueStream return valueStream
} }
type takeResp struct {
RedisTxList []OriginalBatchTxs
ConsenusTxs []*types.Transaction
ConsTxWithBatchs []ConsTxWithBatchHash
BeginOriginalTx common.Hash
EndOriginalTx common.Hash
TxNum int
}
takefn := func(txFs []interface{}) interface{} { takefn := func(txFs []interface{}) interface{} {
redisTxlist := make([]OriginalBatchTxs, 0, batchTxHashSize*batchTxSize) redisTxlist := make([]OriginalBatchTxs, 0, batchTxHashSize*batchTxSize)
...@@ -756,49 +756,46 @@ func (web *WebServicer) sendBatchTxsFromQueue(fromAddr string, toAddrs []string, ...@@ -756,49 +756,46 @@ func (web *WebServicer) sendBatchTxsFromQueue(fromAddr string, toAddrs []string,
var hashesBytes []byte = make([]byte, 0, 32*batchTxHashSize) var hashesBytes []byte = make([]byte, 0, 32*batchTxHashSize)
if txF, ok := txAsInterface.(TxWithFrom); ok { if txF, ok := txAsInterface.(TxWithFrom); ok {
j := 0 if k == 0 {
for ; j < batchTxHashSize; j++ { beginOriginalTx = txF.Tx.Hash()
var txsBytes []byte }
var batchTxs []TxWithFrom = make([]TxWithFrom, 0, batchTxSize)
// for j := 0; j < batchTxHashSize; j++ {
i := 0 var txsBytes []byte
for ; i < batchTxSize; i++ { var batchTxs []TxWithFrom = make([]TxWithFrom, 0, batchTxSize)
if k == 0 {
beginOriginalTx = txF.Tx.Hash() for i := 0; i < batchTxSize; i++ {
}
batchTxs = append(batchTxs, txF)
batchTxs = append(batchTxs, txF) txAsBytes, err := txF.Tx.MarshalBinary()
txAsBytes, err := txF.Tx.MarshalBinary() if err != nil {
if err != nil { return err
return err
}
txsBytes = append(txsBytes, txAsBytes...)
if k == len(txFs)-1 {
break
}
} }
if i == batchTxSize-1 || k == len(txFs)-1 { txsBytes = append(txsBytes, txAsBytes...)
h := sha256.New() if k == len(txFs)-1 {
if _, err := h.Write(txsBytes); err != nil { break
return err }
} }
hashBytes := h.Sum(nil) if (k+1)%(batchTxSize) == 0 || k == len(txFs)-1 {
hashesBytes = append(hashesBytes, hashBytes...)
redisTxlist = append(redisTxlist, OriginalBatchTxs{Hash: hashBytes, Txs: batchTxs})
if k == len(txFs)-1 { h := sha256.New()
endOriginalTx = txF.Tx.Hash() if _, err := h.Write(txsBytes); err != nil {
break return err
}
} }
hashBytes := h.Sum(nil)
hashesBytes = append(hashesBytes, hashBytes...)
redisTxlist = append(redisTxlist, OriginalBatchTxs{Hash: hashBytes, Txs: batchTxs})
} }
//}
if k == len(txFs)-1 {
if j == batchTxHashSize-1 || k == len(txFs)-1 { endOriginalTx = txF.Tx.Hash()
tx, err := web.cli.BuildTx(&hashesBytes) tx, err := web.cli.BuildTx(&hashesBytes)
...@@ -824,10 +821,41 @@ func (web *WebServicer) sendBatchTxsFromQueue(fromAddr string, toAddrs []string, ...@@ -824,10 +821,41 @@ func (web *WebServicer) sendBatchTxsFromQueue(fromAddr string, toAddrs []string,
} }
} }
sleepStream := web.sleep(done, web.takeFn(done, web.repeatFn(done, generator), 4, takefn), 3*time.Second) sleepStream := web.sleep(done, web.takeFn(done, web.repeatFn(done, generator), batchTxHashSize*batchTxSize, takefn), 3*time.Second)
for v := range sleepStream { for v := range sleepStream {
fmt.Printf("v: %v v type: %T \n", v, v, time.Now())
if value, ok := v.(takeResp); ok {
sendToRedisBeginTime := time.Now()
go func() {
for _, originalBatchTxs := range value.RedisTxList {
batchTxsForRedis <- &originalBatchTxs
}
}()
for _, tx := range value.ConsenusTxs {
if err := web.transactor.SendTx(tx); err != nil {
return err
}
}
if record, ok := GetSendRecord(id); ok {
b := BatchSend{
BeginOriginalTx: value.BeginOriginalTx,
EndOriginalTx: value.EndOriginalTx,
SendToRedisBeginTime: sendToRedisBeginTime.Unix(),
SendTxsEndTime: time.Now().Unix(),
TxNum: value.TxNum,
ConsTxWithBatchHash: value.ConsTxWithBatchs,
}
record.SendRecord = append(record.SendRecord, b)
SetSendRecord(id, record)
}
}
} }
return nil return nil
......
...@@ -22,7 +22,7 @@ var originalTxsHashQueue chan *[]byte = make(chan *[]byte, 1000) ...@@ -22,7 +22,7 @@ var originalTxsHashQueue chan *[]byte = make(chan *[]byte, 1000)
var batchTxsForRedis chan *OriginalBatchTxs = make(chan *OriginalBatchTxs, 10000000) var batchTxsForRedis chan *OriginalBatchTxs = make(chan *OriginalBatchTxs, 10000000)
var conTxsQueue chan ConTxsWithId = make(chan ConTxsWithId, 1000) var conTxsQueue chan ConTxsWithId = make(chan ConTxsWithId, 1000)
const batchTxSize = 10 const batchTxSize = 4
const batchTxHashSize = 3 const batchTxHashSize = 3
const batchTxHashQueueSize = 10 const batchTxHashQueueSize = 10
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment