Commit 24059282 authored by Janos Guljas's avatar Janos Guljas

update p2p.HandlerFunc to return error

parent 34c81c30
...@@ -270,7 +270,9 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) { ...@@ -270,7 +270,9 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
} }
s.metrics.HandledStreamCount.Inc() s.metrics.HandledStreamCount.Inc()
ss.Handler(p2p.Peer{Address: overlay}, stream) if err := ss.Handler(p2p.Peer{Address: overlay}, stream); err != nil {
s.logger.Errorf("%s: %s/%s: %w", p.Name, ss.Name, ss.Version, err)
}
}) })
} }
return nil return nil
......
...@@ -48,7 +48,7 @@ func (r *Recorder) NewStream(_ context.Context, overlay, protocolName, streamNam ...@@ -48,7 +48,7 @@ func (r *Recorder) NewStream(_ context.Context, overlay, protocolName, streamNam
streamOut := newStream(recordIn, recordOut) streamOut := newStream(recordIn, recordOut)
streamIn := newStream(recordOut, recordIn) streamIn := newStream(recordOut, recordIn)
var handler func(p2p.Peer, p2p.Stream) var handler p2p.HandlerFunc
for _, p := range r.protocols { for _, p := range r.protocols {
if p.Name == protocolName { if p.Name == protocolName {
for _, s := range p.StreamSpecs { for _, s := range p.StreamSpecs {
...@@ -64,7 +64,11 @@ func (r *Recorder) NewStream(_ context.Context, overlay, protocolName, streamNam ...@@ -64,7 +64,11 @@ func (r *Recorder) NewStream(_ context.Context, overlay, protocolName, streamNam
for _, m := range r.middlewares { for _, m := range r.middlewares {
handler = m(handler) handler = m(handler)
} }
go handler(p2p.Peer{Address: overlay}, streamIn) go func() {
if err := handler(p2p.Peer{Address: overlay}, streamIn); err != nil {
panic(err) // todo: store error and export error records for inspection
}
}()
id := overlay + p2p.NewSwarmStreamName(protocolName, streamName, version) id := overlay + p2p.NewSwarmStreamName(protocolName, streamName, version)
......
...@@ -37,7 +37,7 @@ type StreamSpec struct { ...@@ -37,7 +37,7 @@ type StreamSpec struct {
Handler HandlerFunc Handler HandlerFunc
} }
type HandlerFunc func(Peer, Stream) type HandlerFunc func(Peer, Stream) error
type HandlerMiddleware func(HandlerFunc) HandlerFunc type HandlerMiddleware func(HandlerFunc) HandlerFunc
......
...@@ -34,7 +34,6 @@ type Options struct { ...@@ -34,7 +34,6 @@ type Options struct {
type Logger interface { type Logger interface {
Debugf(format string, args ...interface{}) Debugf(format string, args ...interface{})
Errorf(format string, args ...interface{})
} }
func New(o Options) *Service { func New(o Options) *Service {
...@@ -73,7 +72,7 @@ func (s *Service) Ping(ctx context.Context, address string, msgs ...string) (rtt ...@@ -73,7 +72,7 @@ func (s *Service) Ping(ctx context.Context, address string, msgs ...string) (rtt
if err := w.WriteMsg(&Ping{ if err := w.WriteMsg(&Ping{
Greeting: msg, Greeting: msg,
}); err != nil { }); err != nil {
return 0, fmt.Errorf("stream write: %w", err) return 0, fmt.Errorf("write message: %w", err)
} }
s.metrics.PingSentCount.Inc() s.metrics.PingSentCount.Inc()
...@@ -81,7 +80,7 @@ func (s *Service) Ping(ctx context.Context, address string, msgs ...string) (rtt ...@@ -81,7 +80,7 @@ func (s *Service) Ping(ctx context.Context, address string, msgs ...string) (rtt
if err == io.EOF { if err == io.EOF {
break break
} }
return 0, err return 0, fmt.Errorf("read message: %w", err)
} }
s.logger.Debugf("got pong: %q", pong.Response) s.logger.Debugf("got pong: %q", pong.Response)
...@@ -90,7 +89,7 @@ func (s *Service) Ping(ctx context.Context, address string, msgs ...string) (rtt ...@@ -90,7 +89,7 @@ func (s *Service) Ping(ctx context.Context, address string, msgs ...string) (rtt
return time.Since(start), nil return time.Since(start), nil
} }
func (s *Service) Handler(peer p2p.Peer, stream p2p.Stream) { func (s *Service) Handler(peer p2p.Peer, stream p2p.Stream) error {
w, r := protobuf.NewWriterAndReader(stream) w, r := protobuf.NewWriterAndReader(stream)
defer stream.Close() defer stream.Close()
...@@ -100,8 +99,7 @@ func (s *Service) Handler(peer p2p.Peer, stream p2p.Stream) { ...@@ -100,8 +99,7 @@ func (s *Service) Handler(peer p2p.Peer, stream p2p.Stream) {
if err == io.EOF { if err == io.EOF {
break break
} }
s.logger.Errorf("pingpong handler: read message: %v\n", err) return fmt.Errorf("read message: %w", err)
return
} }
s.logger.Debugf("got ping: %q", ping.Greeting) s.logger.Debugf("got ping: %q", ping.Greeting)
s.metrics.PingReceivedCount.Inc() s.metrics.PingReceivedCount.Inc()
...@@ -109,9 +107,9 @@ func (s *Service) Handler(peer p2p.Peer, stream p2p.Stream) { ...@@ -109,9 +107,9 @@ func (s *Service) Handler(peer p2p.Peer, stream p2p.Stream) {
if err := w.WriteMsg(&Pong{ if err := w.WriteMsg(&Pong{
Response: "{" + ping.Greeting + "}", Response: "{" + ping.Greeting + "}",
}); err != nil { }); err != nil {
s.logger.Errorf("pingpong handler: write message: %v\n", err) return fmt.Errorf("write message: %w", err)
return
} }
s.metrics.PongSentCount.Inc() s.metrics.PongSentCount.Inc()
} }
return nil
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment