Commit a40d342b authored by Janos Guljas's avatar Janos Guljas

record handler error by streamtest Recorder

parent b9b3c996
...@@ -15,7 +15,7 @@ import ( ...@@ -15,7 +15,7 @@ import (
) )
type Recorder struct { type Recorder struct {
records map[string][]Record records map[string][]*Record
recordsMu sync.Mutex recordsMu sync.Mutex
protocols []p2p.ProtocolSpec protocols []p2p.ProtocolSpec
middlewares []p2p.HandlerMiddleware middlewares []p2p.HandlerMiddleware
...@@ -35,7 +35,7 @@ func WithMiddlewares(middlewares ...p2p.HandlerMiddleware) Option { ...@@ -35,7 +35,7 @@ func WithMiddlewares(middlewares ...p2p.HandlerMiddleware) Option {
func New(opts ...Option) *Recorder { func New(opts ...Option) *Recorder {
r := &Recorder{ r := &Recorder{
records: make(map[string][]Record), records: make(map[string][]*Record),
} }
for _, o := range opts { for _, o := range opts {
o.apply(r) o.apply(r)
...@@ -43,7 +43,7 @@ func New(opts ...Option) *Recorder { ...@@ -43,7 +43,7 @@ func New(opts ...Option) *Recorder {
return r return r
} }
func (r *Recorder) NewStream(_ context.Context, overlay swarm.Address, protocolName, streamName, version string) (p2p.Stream, error) { func (r *Recorder) NewStream(_ context.Context, addr swarm.Address, protocolName, streamName, version string) (p2p.Stream, error) {
recordIn := newRecord() recordIn := newRecord()
recordOut := newRecord() recordOut := newRecord()
streamOut := newStream(recordIn, recordOut) streamOut := newStream(recordIn, recordOut)
...@@ -65,37 +65,39 @@ func (r *Recorder) NewStream(_ context.Context, overlay swarm.Address, protocolN ...@@ -65,37 +65,39 @@ func (r *Recorder) NewStream(_ context.Context, overlay swarm.Address, protocolN
for _, m := range r.middlewares { for _, m := range r.middlewares {
handler = m(handler) handler = m(handler)
} }
record := &Record{in: recordIn, out: recordOut}
go func() { go func() {
if err := handler(p2p.Peer{Address: overlay}, streamIn); err != nil { err := handler(p2p.Peer{Address: addr}, streamIn)
panic(err) // todo: store error and export error records for inspection record.setErr(err)
}
}() }()
id := overlay.String() + p2p.NewSwarmStreamName(protocolName, streamName, version) id := addr.String() + p2p.NewSwarmStreamName(protocolName, streamName, version)
r.recordsMu.Lock() r.recordsMu.Lock()
defer r.recordsMu.Unlock() defer r.recordsMu.Unlock()
r.records[id] = append(r.records[id], Record{in: recordIn, out: recordOut}) r.records[id] = append(r.records[id], record)
return streamOut, nil return streamOut, nil
} }
func (r *Recorder) Records(peerID, protocolName, streamName, version string) ([]Record, error) { func (r *Recorder) Records(addr swarm.Address, protocolName, streamName, version string) ([]*Record, error) {
id := peerID + p2p.NewSwarmStreamName(protocolName, streamName, version) id := addr.String() + p2p.NewSwarmStreamName(protocolName, streamName, version)
r.recordsMu.Lock() r.recordsMu.Lock()
defer r.recordsMu.Unlock() defer r.recordsMu.Unlock()
records, ok := r.records[id] records, ok := r.records[id]
if !ok { if !ok {
return nil, fmt.Errorf("records not found for %q %q %q %q", peerID, protocolName, streamName, version) return nil, fmt.Errorf("records not found for %q %q %q %q", addr, protocolName, streamName, version)
} }
return records, nil return records, nil
} }
type Record struct { type Record struct {
in *record in *record
out *record out *record
err error
errMu sync.Mutex
} }
func (r *Record) In() []byte { func (r *Record) In() []byte {
...@@ -106,6 +108,20 @@ func (r *Record) Out() []byte { ...@@ -106,6 +108,20 @@ func (r *Record) Out() []byte {
return r.out.bytes() return r.out.bytes()
} }
func (r *Record) Err() error {
r.errMu.Lock()
defer r.errMu.Unlock()
return r.err
}
func (r *Record) setErr(err error) {
r.errMu.Lock()
defer r.errMu.Unlock()
r.err = err
}
type stream struct { type stream struct {
in io.WriteCloser in io.WriteCloser
out io.ReadCloser out io.ReadCloser
......
...@@ -51,10 +51,9 @@ func TestPing(t *testing.T) { ...@@ -51,10 +51,9 @@ func TestPing(t *testing.T) {
}) })
// ping // ping
peerID := "ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c" addr := swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c")
peerIDAddress := swarm.MustParseHexAddress(peerID)
greetings := []string{"hey", "there", "fella"} greetings := []string{"hey", "there", "fella"}
rtt, err := client.Ping(context.Background(), peerIDAddress, greetings...) rtt, err := client.Ping(context.Background(), addr, greetings...)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -65,7 +64,7 @@ func TestPing(t *testing.T) { ...@@ -65,7 +64,7 @@ func TestPing(t *testing.T) {
} }
// get a record for this stream // get a record for this stream
records, err := recorder.Records(peerID, "pingpong", "pingpong", "1.0.0") records, err := recorder.Records(addr, "pingpong", "pingpong", "1.0.0")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -110,4 +109,8 @@ func TestPing(t *testing.T) { ...@@ -110,4 +109,8 @@ func TestPing(t *testing.T) {
if fmt.Sprint(gotResponses) != fmt.Sprint(wantResponses) { if fmt.Sprint(gotResponses) != fmt.Sprint(wantResponses) {
t.Errorf("got responses %v, want %v", gotResponses, wantResponses) t.Errorf("got responses %v, want %v", gotResponses, wantResponses)
} }
if err := record.Err(); err != nil {
t.Fatal(err)
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment