Commit d3c286ce authored by Janos Guljas's avatar Janos Guljas

extend p2p/protobuf reader and writer with cancelation methods

parent 700e3ec0
...@@ -5,7 +5,10 @@ ...@@ -5,7 +5,10 @@
package protobuf package protobuf
import ( import (
"context"
"errors"
"io" "io"
"time"
"github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
ggio "github.com/gogo/protobuf/io" ggio "github.com/gogo/protobuf/io"
...@@ -14,20 +17,22 @@ import ( ...@@ -14,20 +17,22 @@ import (
const delimitedReaderMaxSize = 128 * 1024 // max message size const delimitedReaderMaxSize = 128 * 1024 // max message size
var ErrTimeout = errors.New("timeout")
type Message = proto.Message type Message = proto.Message
func NewWriterAndReader(s p2p.Stream) (w ggio.Writer, r ggio.Reader) { func NewWriterAndReader(s p2p.Stream) (Writer, Reader) {
r = ggio.NewDelimitedReader(s, delimitedReaderMaxSize) w := ggio.NewDelimitedWriter(s)
w = ggio.NewDelimitedWriter(s) r := ggio.NewDelimitedReader(s, delimitedReaderMaxSize)
return w, r return newWriter(w), newReader(r)
} }
func NewReader(r io.Reader) ggio.Reader { func NewReader(r io.Reader) Reader {
return ggio.NewDelimitedReader(r, delimitedReaderMaxSize) return newReader(ggio.NewDelimitedReader(r, delimitedReaderMaxSize))
} }
func NewWriter(w io.Writer) ggio.Writer { func NewWriter(w io.Writer) ggio.Writer {
return ggio.NewDelimitedWriter(w) return newWriter(ggio.NewDelimitedWriter(w))
} }
func ReadMessages(r io.Reader, newMessage func() Message) (m []Message, err error) { func ReadMessages(r io.Reader, newMessage func() Message) (m []Message, err error) {
...@@ -44,3 +49,81 @@ func ReadMessages(r io.Reader, newMessage func() Message) (m []Message, err erro ...@@ -44,3 +49,81 @@ func ReadMessages(r io.Reader, newMessage func() Message) (m []Message, err erro
} }
return m, nil return m, nil
} }
type Reader struct {
ggio.Reader
}
func newReader(r ggio.Reader) Reader {
return Reader{Reader: r}
}
func (r Reader) ReadMsgWithContext(ctx context.Context, msg proto.Message) error {
errChan := make(chan error, 1)
go func() {
errChan <- r.ReadMsg(msg)
}()
select {
case err := <-errChan:
return err
case <-ctx.Done():
return ctx.Err()
}
}
func (r Reader) ReadMsgWithTimeout(d time.Duration, msg proto.Message) error {
errChan := make(chan error, 1)
go func() {
errChan <- r.ReadMsg(msg)
}()
timer := time.NewTimer(d)
defer timer.Stop()
select {
case err := <-errChan:
return err
case <-timer.C:
return ErrTimeout
}
}
type Writer struct {
ggio.Writer
}
func newWriter(r ggio.Writer) Writer {
return Writer{Writer: r}
}
func (w Writer) WriteMsgWithContext(ctx context.Context, msg proto.Message) error {
errChan := make(chan error, 1)
go func() {
errChan <- w.WriteMsg(msg)
}()
select {
case err := <-errChan:
return err
case <-ctx.Done():
return ctx.Err()
}
}
func (w Writer) WriteMsgWithTimeout(d time.Duration, msg proto.Message) error {
errChan := make(chan error, 1)
go func() {
errChan <- w.WriteMsg(msg)
}()
timer := time.NewTimer(d)
defer timer.Stop()
select {
case err := <-errChan:
return err
case <-timer.C:
return ErrTimeout
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment