Commit 78e6dace authored by 李伟@五瓣科技's avatar 李伟@五瓣科技

check transactor stop

parent ae40c636
......@@ -522,6 +522,10 @@ func (web *WebServicer) TxsHandler(w http.ResponseWriter, r *http.Request) {
SetSendRecord(id, SendRecord{TotalConsTx: int64(total)})
if web.transactor.mustStop() {
web.transactor.Start()
}
go func() {
if err := web.sendLoop(params.From, params.ToAddrs, int(params.TxCount), params.EveryTxAmount, id, params.RequestAmount); err != nil {
......
package multisend
import (
"math/rand"
"testing"
"time"
)
func TestWebService(t *testing.T) {
// WebService()
// w :=WebServicer{}
// repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
// valueStream := make(chan interface{})
// go func() {
// defer close(valueStream)
// for {
// for _, v := range values {
// select {
// case <-done:
// return
// case valueStream <- v:
// }
// }
// }
// }()
// return valueStream
// }
repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
select {
case <-done:
return
case valueStream <- fn():
}
}
}()
return valueStream
}
take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
for i := 0; i < num; i++ {
select {
case <-done:
return
case takeStream <- valueStream:
}
}
}()
return takeStream
}
done := make(chan interface{})
defer close(done)
rand := func() interface{} { return rand.Int() }
takeRes := take(done, repeatFn(done, rand), 10)
t.Logf("%v %T\n", <-takeRes, takeRes)
for {
select {
case num := <-takeRes:
t.Logf("%v", num)
}
}
// for num := range take(done, repeatFn(done, rand), 10) {
// t.Logf("%v %T\n", num, num)
// }
}
// multiple := func(values []int, multiplier int) []int{
// multipliedValues := make([]int, len(values))
// for k,v := range values{
// multipliedValues[k] = v * multiplier
// }
// return multipliedValues
// }
// add := func(values []int, additive int)[]int{
// addedValues := make([]int, len(values))
// for k,v := range values{
// addedValues[i] = v + additive
// }
// return addedValues
// }
func TestRepeatTake(t *testing.T) {
repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
select {
case <-done:
return
case valueStream <- fn():
}
}
}()
return valueStream
}
take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
for i := 0; i < num; i++ {
select {
case <-done:
return
case takeStream <- <-valueStream:
}
}
}()
return takeStream
}
done := make(chan interface{})
defer close(done)
rand := func() interface{} {
// tx, err := buildOriginalTx(originalTxParam.Nonce, toAddress, 10, big.NewInt(256), nil)
// if err != nil {
// }
originalTxParam.Nonce++
if originalTxParam.Nonce < 10 {
t.Logf("originalTxParam.Nonce: %d \n", originalTxParam.Nonce)
}
return originalTxParam.Nonce
}
takeRes := take(done, repeatFn(done, rand), 1000)
for num := range takeRes {
t.Logf("%v %T", num, num)
}
}
func TestFanInFanOut(t *testing.T) {
repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
for _, v := range values {
select {
case <-done:
return
case valueStream <- v:
}
}
}
}()
return valueStream
}
repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
select {
case <-done:
return
case valueStream <- fn():
}
}
}()
return valueStream
}
take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
for i := 0; i < num; i++ {
select {
case <-done:
return
case takeStream <- <-valueStream:
}
}
}()
return takeStream
}
sleep := func(done <-chan interface{}, valueStream <-chan interface{}, s time.Duration) <-chan interface{} {
sleepStream := make(chan interface{})
go func() {
defer close(sleepStream)
sleepTimer := time.NewTimer(s)
for {
select {
case <-done:
return
case <-sleepTimer.C:
for v := range valueStream {
sleepStream <- v
}
return
}
}
}()
return sleepStream
}
orDone := func(done, c <-chan interface{}) <-chan interface{} {
valStream := make(chan interface{})
go func() {
defer close(valStream)
for {
select {
case <-done:
return
case v, ok := <-c:
if ok == false {
return
}
select {
case valStream <- v:
case <-done:
}
}
}
}()
return valStream
}
tee := func(
done <-chan interface{},
in <-chan interface{},
) (<-chan interface{}, <-chan interface{}) {
out1 := make(chan interface{})
out2 := make(chan interface{})
go func() {
defer close(out1)
defer close(out2)
for val := range orDone(done, in) {
var out1, out2 = out1, out2
for i := 0; i < 2; i++ {
select {
case <-done:
case out1 <- val:
out1 = nil
case out2 <- val:
out2 = nil
}
}
}
}()
return out1, out2
}
_, _ = repeat, tee
rand := func() interface{} {
// tx, err := buildOriginalTx(originalTxParam.Nonce, toAddress, 10, big.NewInt(256), nil)
// if err != nil {
// }
originalTxParam.Nonce++
return originalTxParam.Nonce
}
//done := make(chan interface{})
//defer close(done)
for {
done := make(chan interface{})
// sleepSream := sleep(done, take(done, repeatFn(done, rand), 4), 3*time.Second)
// for num := range sleepSream {
// t.Logf("num: %v \n", num)
// }
out1, out2 := tee(done, sleep(done, take(done, repeatFn(done, rand), 4), 3*time.Second))
for val1 := range out1 {
if val1.(uint64) < 100 {
t.Logf("out1: %v , out2: %v \n", val1, <-out2)
}
}
close(done)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment