Commit badf9abf authored by Petar Radovic's avatar Petar Radovic Committed by GitHub

Stream reset initial implementation (#407)

* stream reset
parent 9451b877
...@@ -63,3 +63,7 @@ func (s *Stream) Close() error { ...@@ -63,3 +63,7 @@ func (s *Stream) Close() error {
func (s *Stream) FullClose() error { func (s *Stream) FullClose() error {
return nil return nil
} }
func (s *Stream) Reset() error {
return nil
}
...@@ -216,7 +216,7 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay ...@@ -216,7 +216,7 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
if err != nil { if err != nil {
s.logger.Debugf("handshake: handle %s: %v", peerID, err) s.logger.Debugf("handshake: handle %s: %v", peerID, err)
s.logger.Errorf("unable to handshake with peer %v", peerID) s.logger.Errorf("unable to handshake with peer %v", peerID)
_ = stream.Reset() _ = handshakeStream.Reset()
_ = s.disconnect(peerID) _ = s.disconnect(peerID)
return return
} }
...@@ -376,7 +376,7 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz. ...@@ -376,7 +376,7 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.
handshakeStream := NewStream(stream) handshakeStream := NewStream(stream)
i, err := s.handshakeService.Handshake(handshakeStream, stream.Conn().RemoteMultiaddr(), stream.Conn().RemotePeer()) i, err := s.handshakeService.Handshake(handshakeStream, stream.Conn().RemoteMultiaddr(), stream.Conn().RemotePeer())
if err != nil { if err != nil {
_ = stream.Reset() _ = handshakeStream.Reset()
_ = s.disconnect(info.ID) _ = s.disconnect(info.ID)
return nil, fmt.Errorf("handshake: %w", err) return nil, fmt.Errorf("handshake: %w", err)
} }
......
...@@ -35,6 +35,7 @@ type Stream interface { ...@@ -35,6 +35,7 @@ type Stream interface {
io.Closer io.Closer
Headers() Headers Headers() Headers
FullClose() error FullClose() error
Reset() error
} }
// ProtocolSpec defines a collection of Stream specifications with handlers. // ProtocolSpec defines a collection of Stream specifications with handlers.
......
...@@ -387,6 +387,10 @@ func (noopWriteCloser) FullClose() error { ...@@ -387,6 +387,10 @@ func (noopWriteCloser) FullClose() error {
return nil return nil
} }
func (noopWriteCloser) Reset() error {
return nil
}
type noopReadCloser struct { type noopReadCloser struct {
io.Writer io.Writer
} }
...@@ -410,3 +414,7 @@ func (noopReadCloser) Close() error { ...@@ -410,3 +414,7 @@ func (noopReadCloser) Close() error {
func (noopReadCloser) FullClose() error { func (noopReadCloser) FullClose() error {
return nil return nil
} }
func (noopReadCloser) Reset() error {
return nil
}
...@@ -235,6 +235,11 @@ func (s *stream) FullClose() error { ...@@ -235,6 +235,11 @@ func (s *stream) FullClose() error {
return nil return nil
} }
func (s *stream) Reset() error {
//todo: :implement appropriately after all protocols are migrated and tested
return s.Close()
}
type record struct { type record struct {
b []byte b []byte
c int c int
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment