Commit 8769debb authored by 李伟@五瓣科技's avatar 李伟@五瓣科技

repeat take sleep channel

parent 583d72ce
...@@ -118,6 +118,7 @@ type BatchSend struct { ...@@ -118,6 +118,7 @@ type BatchSend struct {
type WebServicer struct { type WebServicer struct {
transactor *Transactor transactor *Transactor
cli Client cli Client
generator <-chan interface{}
} }
const MaxToAddrsNum = 1000 const MaxToAddrsNum = 1000
...@@ -356,6 +357,8 @@ func (web *WebServicer) Calculate(w http.ResponseWriter, r *http.Request) { ...@@ -356,6 +357,8 @@ func (web *WebServicer) Calculate(w http.ResponseWriter, r *http.Request) {
} }
const bufferSize = 100 * 1000
func (web *WebServicer) WebService(config Config) error { func (web *WebServicer) WebService(config Config) error {
r := mux.NewRouter() r := mux.NewRouter()
// Routes consist of a path and a handler function. // Routes consist of a path and a handler function.
...@@ -528,7 +531,7 @@ func (web *WebServicer) TxsHandler(w http.ResponseWriter, r *http.Request) { ...@@ -528,7 +531,7 @@ func (web *WebServicer) TxsHandler(w http.ResponseWriter, r *http.Request) {
go func() { go func() {
if err := web.sendLoop(params.From, params.ToAddrs, int(params.TxCount), params.EveryTxAmount, id, params.RequestAmount); err != nil { if err := web.sendBatchTxsFromQueue(params.From, params.ToAddrs, int(params.TxCount), params.EveryTxAmount, id, params.RequestAmount); err != nil {
//if err := web.ProduceTxs(params.From, params.ToAddrs, int(params.TxCount), params.EveryTxAmount, id, params.RequestAmount); err != nil { //if err := web.ProduceTxs(params.From, params.ToAddrs, int(params.TxCount), params.EveryTxAmount, id, params.RequestAmount); err != nil {
fmt.Printf("web send loop, id: %s err: %s \n", id, err.Error()) fmt.Printf("web send loop, id: %s err: %s \n", id, err.Error())
} }
...@@ -570,6 +573,256 @@ type WebResp struct { ...@@ -570,6 +573,256 @@ type WebResp struct {
AllTxs []ConsTxHashs `json:"all_txs"` AllTxs []ConsTxHashs `json:"all_txs"`
} }
func (web *WebServicer) repeatFn(done chan interface{}, fn func() <-chan interface{}) <-chan interface{} {
valueStream := make(chan interface{}, bufferSize)
inStream := fn()
go func() {
defer close(valueStream)
for val := range web.orDone(done, inStream) {
valueStream <- val
}
}()
return valueStream
}
func (web *WebServicer) takeFn(done <-chan interface{}, in <-chan interface{}, num int, fn func(txs []interface{}) interface{}) <-chan interface{} {
takeStream := make(chan interface{}, bufferSize)
go func() {
defer close(takeStream)
txs := make([]interface{}, 0, num)
count := 0
for val := range web.orDone(done, in) {
txs = append(txs, val)
count++
if count == num {
takeStream <- fn(txs)
txs = make([]interface{}, 0, num)
count = 0
}
}
if len(txs) != 0 {
takeStream <- fn(txs)
}
}()
return takeStream
}
func (web *WebServicer) sleep(done <-chan interface{}, valueStream <-chan interface{}, s time.Duration) <-chan interface{} {
sleepStream := make(chan interface{}, bufferSize)
go func() {
defer close(sleepStream)
sleepTicker := time.NewTicker(s)
first := true
for val := range web.orDone(done, valueStream) {
if first {
sleepStream <- val
first = false
continue
}
select {
case <-done:
return
case <-sleepTicker.C:
sleepStream <- val
}
}
}()
return sleepStream
}
func (web *WebServicer) orDone(done, c <-chan interface{}) <-chan interface{} {
valStream := make(chan interface{})
go func() {
defer close(valStream)
for {
select {
case <-done:
return
case v, ok := <-c:
if ok == false {
return
}
select {
case valStream <- v:
case <-done:
}
}
}
}()
return valStream
}
func (web *WebServicer) tee(
done <-chan interface{},
in <-chan interface{},
) (<-chan interface{}, <-chan interface{}) {
out1 := make(chan interface{})
out2 := make(chan interface{})
go func() {
defer close(out1)
defer close(out2)
for val := range web.orDone(done, in) {
var out1, out2 = out1, out2
for i := 0; i < 2; i++ {
select {
case <-done:
case out1 <- val:
out1 = nil
case out2 <- val:
out2 = nil
}
}
}
}()
return out1, out2
}
func (web *WebServicer) sendBatchTxsFromQueue(fromAddr string, toAddrs []string, txCount int, amount int64, id uuid.UUID, requestAmount int64) error {
done := make(chan interface{})
defer close(done)
generator := func() <-chan interface{} {
addrsL := len(toAddrs)
var tx *types.Transaction
valueStream := make(chan interface{})
go func() {
first := fromAddr != systemFromAddr
count := txCount
for {
if first {
tx, _ = buildOriginalTx(originalTxParam.Nonce, common.HexToAddress(fromAddr), requestAmount, big.NewInt(256), nil)
count++
first = false
} else {
tx, _ = buildOriginalTx(originalTxParam.Nonce, common.HexToAddress(toAddrs[count%addrsL]), amount, big.NewInt(256), nil)
}
originalTxParam.Nonce++
valueStream <- TxWithFrom{
From: common.HexToAddress(systemFromAddr).Bytes(),
Tx: tx,
}
if count == 0 {
close(valueStream)
return
}
}
}()
return valueStream
}
type takeResp struct {
RedisTxList []OriginalBatchTxs
ConsenusTxs []*types.Transaction
ConsTxWithBatchs []ConsTxWithBatchHash
BeginOriginalTx common.Hash
EndOriginalTx common.Hash
TxNum int
}
takefn := func(txFs []interface{}) interface{} {
redisTxlist := make([]OriginalBatchTxs, 0, batchTxHashSize*batchTxSize)
consenusTxs := make([]*types.Transaction, 0, batchTxHashSize)
consTxWithBatchs := make([]ConsTxWithBatchHash, 0, batchTxHashSize)
var beginOriginalTx common.Hash
var endOriginalTx common.Hash
for k, txAsInterface := range txFs {
var hashesBytes []byte = make([]byte, 0, 32*batchTxHashSize)
if txF, ok := txAsInterface.(TxWithFrom); ok {
for j := 0; j < batchTxHashSize; j++ {
var txsBytes []byte
var batchTxs []TxWithFrom = make([]TxWithFrom, 0, batchTxSize)
for i := 0; i < batchTxSize; i++ {
if k == 0 {
beginOriginalTx = txF.Tx.Hash()
}
batchTxs = append(batchTxs, txF)
txAsBytes, err := txF.Tx.MarshalBinary()
if err != nil {
return err
}
txsBytes = append(txsBytes, txAsBytes...)
if k == len(txFs)-1 {
break
}
}
h := sha256.New()
if _, err := h.Write(txsBytes); err != nil {
return err
}
hashBytes := h.Sum(nil)
hashesBytes = append(hashesBytes, hashBytes...)
redisTxlist = append(redisTxlist, OriginalBatchTxs{Hash: hashBytes, Txs: batchTxs})
if k == len(txFs)-1 {
endOriginalTx = txF.Tx.Hash()
break
}
}
tx, err := web.cli.BuildTx(&hashesBytes)
if err != nil {
return err
}
consenusTxs = append(consenusTxs, tx)
consTxWithBatchs = append(consTxWithBatchs, ConsTxWithBatchHash{ConsTxHash: tx.Hash().Bytes(),
BatchTxsHash: hashesBytes})
}
}
return takeResp{
RedisTxList: redisTxlist,
ConsenusTxs: consenusTxs,
ConsTxWithBatchs: consTxWithBatchs,
BeginOriginalTx: beginOriginalTx,
EndOriginalTx: endOriginalTx,
TxNum: len(txFs),
}
}
sleepStream := web.sleep(done, web.takeFn(done, web.repeatFn(done, generator), 4, takefn), 3*time.Second)
for v := range sleepStream {
fmt.Printf("v: %v v type: %T \n", v, v, time.Now())
}
return nil
}
func (web *WebServicer) sendLoop(fromAddr string, toAddrs []string, txCount int, amount int64, id uuid.UUID, requestAmount int64) error { func (web *WebServicer) sendLoop(fromAddr string, toAddrs []string, txCount int, amount int64, id uuid.UUID, requestAmount int64) error {
addrsL := len(toAddrs) addrsL := len(toAddrs)
......
...@@ -304,3 +304,210 @@ func TestFanInFanOut(t *testing.T) { ...@@ -304,3 +304,210 @@ func TestFanInFanOut(t *testing.T) {
close(done) close(done)
} }
} }
func TestFanInFanOutBuffer(t *testing.T) {
repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
for _, v := range values {
select {
case <-done:
return
case valueStream <- v:
}
}
}
}()
return valueStream
}
repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {
valueStream := make(chan interface{}, bufferSize)
go func() {
defer close(valueStream)
for {
select {
case <-done:
return
case valueStream <- fn():
}
}
}()
return valueStream
}
take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {
takeStream := make(chan interface{}, bufferSize)
go func() {
defer close(takeStream)
for i := 0; i < num; i++ {
select {
case <-done:
return
case takeStream <- <-valueStream:
}
}
}()
return takeStream
}
sleep := func(done <-chan interface{}, valueStream <-chan interface{}, s time.Duration, num int) <-chan interface{} {
sleepStream := make(chan interface{}, bufferSize)
go func() {
defer close(sleepStream)
sleepTicker := time.NewTicker(s)
for {
select {
case <-done:
return
case <-sleepTicker.C:
// for v := range valueStream {
// sleepStream <- v
// }
for i := 0; i < num; i++ {
select {
case <-done:
return
case sleepStream <- <-valueStream:
}
}
//return
}
}
}()
return sleepStream
}
orDone := func(done, c <-chan interface{}) <-chan interface{} {
valStream := make(chan interface{})
go func() {
defer close(valStream)
for {
select {
case <-done:
return
case v, ok := <-c:
if ok == false {
return
}
select {
case valStream <- v:
case <-done:
}
}
}
}()
return valStream
}
tee := func(
done <-chan interface{},
in <-chan interface{},
) (<-chan interface{}, <-chan interface{}) {
out1 := make(chan interface{})
out2 := make(chan interface{})
go func() {
defer close(out1)
defer close(out2)
for val := range orDone(done, in) {
var out1, out2 = out1, out2
for i := 0; i < 2; i++ {
select {
case <-done:
case out1 <- val:
out1 = nil
case out2 <- val:
out2 = nil
}
}
}
}()
return out1, out2
}
_, _ = repeat, tee
rand := func() interface{} {
// tx, err := buildOriginalTx(originalTxParam.Nonce, toAddress, 10, big.NewInt(256), nil)
// if err != nil {
// }
originalTxParam.Nonce++
return originalTxParam.Nonce
}
//done := make(chan interface{})
//defer close(done)
rdone := make(chan interface{})
defer close(rdone)
repeatChan := repeatFn(rdone, rand)
//for {
done := make(chan interface{})
_ = take
// sleepSream := sleep(done, take(done, repeatFn(done, rand), 4), 3*time.Second)
// for num := range sleepSream {
// t.Logf("num: %v \n", num)
// }
out1, out2 := tee(done, sleep(done, repeatChan, 3*time.Second, 1000))
for val1 := range out1 {
//if val1.(uint64) < 100 {
t.Logf("out1: %v , out2: %v \n", val1, <-out2)
//}
}
close(done)
//}
}
func TestOrDone(t *testing.T) {
web := &WebServicer{}
done := make(chan interface{})
defer close(done)
rand := func() <-chan interface{} {
valueStream := make(chan interface{})
go func() {
for {
//time.Sleep(time.Second)
originalTxParam.Nonce++
valueStream <- originalTxParam.Nonce
if originalTxParam.Nonce == 3 {
close(valueStream)
return
}
}
}()
return valueStream
}
takefn := func(txs []interface{}) interface{} {
return txs
}
sleepStream := web.sleep(done, web.takeFn(done, web.repeatFn(done, rand), 4, takefn), 3*time.Second)
for v := range sleepStream {
t.Logf("sleep v: %v time: %s \n", v, time.Now())
}
}
...@@ -44,7 +44,7 @@ func Start(redisAddr, passwd string) { ...@@ -44,7 +44,7 @@ func Start(redisAddr, passwd string) {
} }
count := 0 count := 0
limiter := rate.NewLimiter(rate.Every(time.Millisecond*100), 1) limiter := rate.NewLimiter(rate.Every(time.Millisecond*10), 1)
cxt, _ := context.WithCancel(context.TODO()) cxt, _ := context.WithCancel(context.TODO())
logTicker := time.NewTicker(5 * time.Second) logTicker := time.NewTicker(5 * time.Second)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment