Commit 08c92e90 authored by Esad Akar's avatar Esad Akar Committed by GitHub

TestRetrievePreemptiveRetry (#1461)

parent c6d018ed
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"sync"
"testing" "testing"
"time" "time"
...@@ -230,7 +231,6 @@ func TestRetrieveChunk(t *testing.T) { ...@@ -230,7 +231,6 @@ func TestRetrieveChunk(t *testing.T) {
} }
func TestRetrievePreemptiveRetry(t *testing.T) { func TestRetrievePreemptiveRetry(t *testing.T) {
t.Skip("needs some more tendering. baseaddr change made a mess here")
logger := logging.New(ioutil.Discard, 0) logger := logging.New(ioutil.Discard, 0)
chunk := testingc.FixtureChunk("0025") chunk := testingc.FixtureChunk("0025")
...@@ -289,6 +289,8 @@ func TestRetrievePreemptiveRetry(t *testing.T) { ...@@ -289,6 +289,8 @@ func TestRetrievePreemptiveRetry(t *testing.T) {
server2 := retrieval.New(serverAddress2, serverStorer2, nil, noPeerSuggester, logger, accountingmock.NewAccounting(), pricerMock, nil) server2 := retrieval.New(serverAddress2, serverStorer2, nil, noPeerSuggester, logger, accountingmock.NewAccounting(), pricerMock, nil)
t.Run("peer not reachable", func(t *testing.T) { t.Run("peer not reachable", func(t *testing.T) {
ranOnce := true
ranMux := sync.Mutex{}
recorder := streamtest.New( recorder := streamtest.New(
streamtest.WithProtocols( streamtest.WithProtocols(
server1.Protocol(), server1.Protocol(),
...@@ -297,19 +299,19 @@ func TestRetrievePreemptiveRetry(t *testing.T) { ...@@ -297,19 +299,19 @@ func TestRetrievePreemptiveRetry(t *testing.T) {
streamtest.WithMiddlewares( streamtest.WithMiddlewares(
func(h p2p.HandlerFunc) p2p.HandlerFunc { func(h p2p.HandlerFunc) p2p.HandlerFunc {
return func(ctx context.Context, peer p2p.Peer, stream p2p.Stream) error { return func(ctx context.Context, peer p2p.Peer, stream p2p.Stream) error {
ranMux.Lock()
defer ranMux.Unlock()
// NOTE: return error for peer1 // NOTE: return error for peer1
if serverAddress1.Equal(peer.Address) { if ranOnce {
ranOnce = false
return fmt.Errorf("peer not reachable: %s", peer.Address.String()) return fmt.Errorf("peer not reachable: %s", peer.Address.String())
} }
if serverAddress2.Equal(peer.Address) { return server2.Handler(ctx, peer, stream)
return server2.Handler(ctx, peer, stream)
}
return fmt.Errorf("unknown peer: %s", peer.Address.String())
} }
}, },
), ),
streamtest.WithBaseAddr(clientAddress),
) )
client := retrieval.New(clientAddress, nil, recorder, peerSuggesterFn(peers...), logger, accountingmock.NewAccounting(), pricerMock, nil) client := retrieval.New(clientAddress, nil, recorder, peerSuggesterFn(peers...), logger, accountingmock.NewAccounting(), pricerMock, nil)
...@@ -325,6 +327,8 @@ func TestRetrievePreemptiveRetry(t *testing.T) { ...@@ -325,6 +327,8 @@ func TestRetrievePreemptiveRetry(t *testing.T) {
}) })
t.Run("peer does not have chunk", func(t *testing.T) { t.Run("peer does not have chunk", func(t *testing.T) {
ranOnce := true
ranMux := sync.Mutex{}
recorder := streamtest.New( recorder := streamtest.New(
streamtest.WithProtocols( streamtest.WithProtocols(
server1.Protocol(), server1.Protocol(),
...@@ -333,15 +337,14 @@ func TestRetrievePreemptiveRetry(t *testing.T) { ...@@ -333,15 +337,14 @@ func TestRetrievePreemptiveRetry(t *testing.T) {
streamtest.WithMiddlewares( streamtest.WithMiddlewares(
func(h p2p.HandlerFunc) p2p.HandlerFunc { func(h p2p.HandlerFunc) p2p.HandlerFunc {
return func(ctx context.Context, peer p2p.Peer, stream p2p.Stream) error { return func(ctx context.Context, peer p2p.Peer, stream p2p.Stream) error {
if serverAddress1.Equal(peer.Address) { ranMux.Lock()
defer ranMux.Unlock()
if ranOnce {
ranOnce = false
return server1.Handler(ctx, peer, stream) return server1.Handler(ctx, peer, stream)
} }
if serverAddress2.Equal(peer.Address) { return server2.Handler(ctx, peer, stream)
return server2.Handler(ctx, peer, stream)
}
return fmt.Errorf("unknown peer: %s", peer.Address.String())
} }
}, },
), ),
...@@ -383,6 +386,8 @@ func TestRetrievePreemptiveRetry(t *testing.T) { ...@@ -383,6 +386,8 @@ func TestRetrievePreemptiveRetry(t *testing.T) {
// (here one second more) // (here one second more)
server1ResponseDelayDuration := 6 * time.Second server1ResponseDelayDuration := 6 * time.Second
ranOnce := true
ranMux := sync.Mutex{}
recorder := streamtest.New( recorder := streamtest.New(
streamtest.WithProtocols( streamtest.WithProtocols(
server1.Protocol(), server1.Protocol(),
...@@ -391,17 +396,17 @@ func TestRetrievePreemptiveRetry(t *testing.T) { ...@@ -391,17 +396,17 @@ func TestRetrievePreemptiveRetry(t *testing.T) {
streamtest.WithMiddlewares( streamtest.WithMiddlewares(
func(h p2p.HandlerFunc) p2p.HandlerFunc { func(h p2p.HandlerFunc) p2p.HandlerFunc {
return func(ctx context.Context, peer p2p.Peer, stream p2p.Stream) error { return func(ctx context.Context, peer p2p.Peer, stream p2p.Stream) error {
if serverAddress1.Equal(peer.Address) { ranMux.Lock()
if ranOnce {
// NOTE: sleep time must be more than retry duration // NOTE: sleep time must be more than retry duration
ranOnce = false
ranMux.Unlock()
time.Sleep(server1ResponseDelayDuration) time.Sleep(server1ResponseDelayDuration)
return server1.Handler(ctx, peer, stream) return server1.Handler(ctx, peer, stream)
} }
if serverAddress2.Equal(peer.Address) { ranMux.Unlock()
return server2.Handler(ctx, peer, stream) return server2.Handler(ctx, peer, stream)
}
return fmt.Errorf("unknown peer: %s", peer.Address.String())
} }
}, },
), ),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment