Commit 35e9e468 authored by acud's avatar acud Committed by GitHub

add noop middleware to recorder (#217)

parent bd44353e
...@@ -22,6 +22,10 @@ var ( ...@@ -22,6 +22,10 @@ var (
ErrStreamFullcloseTimeout = errors.New("fullclose timeout") ErrStreamFullcloseTimeout = errors.New("fullclose timeout")
fullCloseTimeout = fullCloseTimeoutDefault // timeout of fullclose fullCloseTimeout = fullCloseTimeoutDefault // timeout of fullclose
fullCloseTimeoutDefault = 5 * time.Second // default timeout used for helper function to reset timeout when changed fullCloseTimeoutDefault = 5 * time.Second // default timeout used for helper function to reset timeout when changed
noopMiddleware = func(f p2p.HandlerFunc) p2p.HandlerFunc {
return f
}
) )
type Recorder struct { type Recorder struct {
...@@ -47,6 +51,9 @@ func New(opts ...Option) *Recorder { ...@@ -47,6 +51,9 @@ func New(opts ...Option) *Recorder {
r := &Recorder{ r := &Recorder{
records: make(map[string][]*Record), records: make(map[string][]*Record),
} }
r.middlewares = append(r.middlewares, noopMiddleware)
for _, o := range opts { for _, o := range opts {
o.apply(r) o.apply(r)
} }
...@@ -118,7 +125,7 @@ func (r *Recorder) Records(addr swarm.Address, protocolName, protocolVersio, str ...@@ -118,7 +125,7 @@ func (r *Recorder) Records(addr swarm.Address, protocolName, protocolVersio, str
// WaitRecords waits for some time for records to come into the recorder. If msgs is 0, the timeoutSec period is waited to verify // WaitRecords waits for some time for records to come into the recorder. If msgs is 0, the timeoutSec period is waited to verify
// that _no_ messages arrive during this time period. // that _no_ messages arrive during this time period.
func (r *Recorder) WaitRecords(t *testing.T, addr swarm.Address, proto, version, stream string, msgs int, timeoutSec int) []*Record { func (r *Recorder) WaitRecords(t *testing.T, addr swarm.Address, proto, version, stream string, msgs, timeoutSec int) []*Record {
t.Helper() t.Helper()
wait := 10 * time.Millisecond wait := 10 * time.Millisecond
iters := int((time.Duration(timeoutSec) * time.Second) / wait) iters := int((time.Duration(timeoutSec) * time.Second) / wait)
......
...@@ -12,7 +12,6 @@ import ( ...@@ -12,7 +12,6 @@ import (
"github.com/ethersphere/bee/pkg/localstore" "github.com/ethersphere/bee/pkg/localstore"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/protobuf" "github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/ethersphere/bee/pkg/p2p/streamtest" "github.com/ethersphere/bee/pkg/p2p/streamtest"
"github.com/ethersphere/bee/pkg/pushsync" "github.com/ethersphere/bee/pkg/pushsync"
...@@ -39,12 +38,7 @@ func TestSendChunkAndReceiveReceipt(t *testing.T) { ...@@ -39,12 +38,7 @@ func TestSendChunkAndReceiveReceipt(t *testing.T) {
psPeer, storerPeer := createPushSyncNode(t, closestPeer, nil, mock.WithClosestPeerErr(topology.ErrWantSelf)) psPeer, storerPeer := createPushSyncNode(t, closestPeer, nil, mock.WithClosestPeerErr(topology.ErrWantSelf))
defer storerPeer.Close() defer storerPeer.Close()
recorder := streamtest.New( recorder := streamtest.New(streamtest.WithProtocols(psPeer.Protocol()))
streamtest.WithProtocols(psPeer.Protocol()),
streamtest.WithMiddlewares(func(f p2p.HandlerFunc) p2p.HandlerFunc {
return f
}),
)
// pivot node needs the streamer since the chunk is intercepted by // pivot node needs the streamer since the chunk is intercepted by
// the chunk worker, then gets sent by opening a new stream // the chunk worker, then gets sent by opening a new stream
...@@ -90,23 +84,13 @@ func TestHandler(t *testing.T) { ...@@ -90,23 +84,13 @@ func TestHandler(t *testing.T) {
psClosestPeer, closestStorerPeerDB := createPushSyncNode(t, closestPeer, nil, mock.WithClosestPeerErr(topology.ErrWantSelf)) psClosestPeer, closestStorerPeerDB := createPushSyncNode(t, closestPeer, nil, mock.WithClosestPeerErr(topology.ErrWantSelf))
defer closestStorerPeerDB.Close() defer closestStorerPeerDB.Close()
closestRecorder := streamtest.New( closestRecorder := streamtest.New(streamtest.WithProtocols(psClosestPeer.Protocol()))
streamtest.WithProtocols(psClosestPeer.Protocol()),
streamtest.WithMiddlewares(func(f p2p.HandlerFunc) p2p.HandlerFunc {
return f
}),
)
// creating the pivot peer // creating the pivot peer
psPivot, storerPivotDB := createPushSyncNode(t, pivotPeer, closestRecorder, mock.WithClosestPeer(closestPeer)) psPivot, storerPivotDB := createPushSyncNode(t, pivotPeer, closestRecorder, mock.WithClosestPeer(closestPeer))
defer storerPivotDB.Close() defer storerPivotDB.Close()
pivotRecorder := streamtest.New( pivotRecorder := streamtest.New(streamtest.WithProtocols(psPivot.Protocol()))
streamtest.WithProtocols(psPivot.Protocol()),
streamtest.WithMiddlewares(func(f p2p.HandlerFunc) p2p.HandlerFunc {
return f
}),
)
// Creating the trigger peer // Creating the trigger peer
psTriggerPeer, triggerStorerDB := createPushSyncNode(t, triggerPeer, pivotRecorder, mock.WithClosestPeer(pivotPeer)) psTriggerPeer, triggerStorerDB := createPushSyncNode(t, triggerPeer, pivotRecorder, mock.WithClosestPeer(pivotPeer))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment