Commit 6c435112 authored by Janoš Guljaš's avatar Janoš Guljaš Committed by GitHub

blocklist peer on retrieval timeout (#826)

parent 034dfb4f
...@@ -19,13 +19,17 @@ type Service interface { ...@@ -19,13 +19,17 @@ type Service interface {
AddProtocol(ProtocolSpec) error AddProtocol(ProtocolSpec) error
// Connect to a peer but do not notify topology about the established connection. // Connect to a peer but do not notify topology about the established connection.
Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.Address, err error) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.Address, err error)
Disconnecter
Peers() []Peer
Addresses() ([]ma.Multiaddr, error)
SetNotifier(Notifier)
}
type Disconnecter interface {
Disconnect(overlay swarm.Address) error Disconnect(overlay swarm.Address) error
// Blocklist will disconnect a peer and put it on a blocklist (blocking in & out connections) for provided duration // Blocklist will disconnect a peer and put it on a blocklist (blocking in & out connections) for provided duration
// duration 0 is treated as an infinite duration // duration 0 is treated as an infinite duration
Blocklist(overlay swarm.Address, duration time.Duration) error Blocklist(overlay swarm.Address, duration time.Duration) error
Peers() []Peer
Addresses() ([]ma.Multiaddr, error)
SetNotifier(Notifier)
} }
type Notifier interface { type Notifier interface {
...@@ -45,6 +49,11 @@ type Streamer interface { ...@@ -45,6 +49,11 @@ type Streamer interface {
NewStream(ctx context.Context, address swarm.Address, h Headers, protocol, version, stream string) (Stream, error) NewStream(ctx context.Context, address swarm.Address, h Headers, protocol, version, stream string) (Stream, error)
} }
type StreamerDisconnecter interface {
Streamer
Disconnecter
}
// Stream represent a bidirectional data Stream. // Stream represent a bidirectional data Stream.
type Stream interface { type Stream interface {
io.ReadWriter io.ReadWriter
......
...@@ -319,3 +319,52 @@ type Option interface { ...@@ -319,3 +319,52 @@ type Option interface {
type optionFunc func(*Recorder) type optionFunc func(*Recorder)
func (f optionFunc) apply(r *Recorder) { f(r) } func (f optionFunc) apply(r *Recorder) { f(r) }
var _ p2p.StreamerDisconnecter = (*RecorderDisconnecter)(nil)
type RecorderDisconnecter struct {
*Recorder
disconnected map[string]struct{}
blocklisted map[string]time.Duration
mu sync.RWMutex
}
func NewRecorderDisconnecter(r *Recorder) *RecorderDisconnecter {
return &RecorderDisconnecter{
Recorder: r,
disconnected: make(map[string]struct{}),
blocklisted: make(map[string]time.Duration),
}
}
func (r *RecorderDisconnecter) Disconnect(overlay swarm.Address) error {
r.mu.Lock()
defer r.mu.Unlock()
r.disconnected[overlay.String()] = struct{}{}
return nil
}
func (r *RecorderDisconnecter) Blocklist(overlay swarm.Address, d time.Duration) error {
r.mu.Lock()
defer r.mu.Unlock()
r.blocklisted[overlay.String()] = d
return nil
}
func (r *RecorderDisconnecter) IsDisconnected(overlay swarm.Address) bool {
r.mu.RLock()
defer r.mu.RUnlock()
_, yes := r.disconnected[overlay.String()]
return yes
}
func (r *RecorderDisconnecter) IsBlocklisted(overlay swarm.Address) (bool, time.Duration) {
r.mu.RLock()
defer r.mu.RUnlock()
d, yes := r.blocklisted[overlay.String()]
return yes, d
}
...@@ -227,9 +227,9 @@ func newTestNetStore(t *testing.T, recoveryFunc recovery.RecoveryHook) storage.S ...@@ -227,9 +227,9 @@ func newTestNetStore(t *testing.T, recoveryFunc recovery.RecoveryHook) storage.S
return nil return nil
}} }}
server := retrieval.New(swarm.ZeroAddress, mockStorer, nil, ps, logger, serverMockAccounting, nil, nil, nil) server := retrieval.New(swarm.ZeroAddress, mockStorer, nil, ps, logger, serverMockAccounting, nil, nil, nil)
recorder := streamtest.New( recorder := streamtest.NewRecorderDisconnecter(streamtest.New(
streamtest.WithProtocols(server.Protocol()), streamtest.WithProtocols(server.Protocol()),
) ))
retrieve := retrieval.New(swarm.ZeroAddress, mockStorer, recorder, ps, logger, serverMockAccounting, pricerMock, nil, nil) retrieve := retrieval.New(swarm.ZeroAddress, mockStorer, recorder, ps, logger, serverMockAccounting, pricerMock, nil, nil)
ns := netstore.New(storer, recoveryFunc, retrieve, logger, nil) ns := netstore.New(storer, recoveryFunc, retrieve, logger, nil)
return ns return ns
......
...@@ -39,7 +39,7 @@ type Interface interface { ...@@ -39,7 +39,7 @@ type Interface interface {
type Service struct { type Service struct {
addr swarm.Address addr swarm.Address
streamer p2p.Streamer streamer p2p.StreamerDisconnecter
peerSuggester topology.EachPeerer peerSuggester topology.EachPeerer
storer storage.Storer storer storage.Storer
singleflight singleflight.Group singleflight singleflight.Group
...@@ -50,7 +50,7 @@ type Service struct { ...@@ -50,7 +50,7 @@ type Service struct {
tracer *tracing.Tracer tracer *tracing.Tracer
} }
func New(addr swarm.Address, storer storage.Storer, streamer p2p.Streamer, chunkPeerer topology.EachPeerer, logger logging.Logger, accounting accounting.Interface, pricer accounting.Pricer, validator swarm.Validator, tracer *tracing.Tracer) *Service { func New(addr swarm.Address, storer storage.Storer, streamer p2p.StreamerDisconnecter, chunkPeerer topology.EachPeerer, logger logging.Logger, accounting accounting.Interface, pricer accounting.Pricer, validator swarm.Validator, tracer *tracing.Tracer) *Service {
return &Service{ return &Service{
addr: addr, addr: addr,
streamer: streamer, streamer: streamer,
...@@ -80,6 +80,7 @@ func (s *Service) Protocol() p2p.ProtocolSpec { ...@@ -80,6 +80,7 @@ func (s *Service) Protocol() p2p.ProtocolSpec {
const ( const (
maxPeers = 5 maxPeers = 5
retrieveChunkTimeout = 10 * time.Second retrieveChunkTimeout = 10 * time.Second
blocklistDuration = time.Minute
) )
func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address) (swarm.Chunk, error) { func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address) (swarm.Chunk, error) {
...@@ -100,6 +101,14 @@ func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address) (swarm. ...@@ -100,6 +101,14 @@ func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address) (swarm.
} }
logger.Debugf("retrieval: failed to get chunk %s from peer %s: %v", addr, peer, err) logger.Debugf("retrieval: failed to get chunk %s from peer %s: %v", addr, peer, err)
skipPeers = append(skipPeers, peer) skipPeers = append(skipPeers, peer)
if errors.Is(err, context.DeadlineExceeded) {
if err := s.streamer.Blocklist(peer, blocklistDuration); err != nil {
s.logger.Errorf("retrieval: unable to block peer %s", peer)
s.logger.Debugf("retrieval: blocking peer %s: %v", peer, err)
} else {
s.logger.Warningf("retrieval: peer %s blocked as unresponsive", peer)
}
}
continue continue
} }
logger.Tracef("retrieval: got chunk %s from peer %s", addr, peer) logger.Tracef("retrieval: got chunk %s from peer %s", addr, peer)
...@@ -253,8 +262,8 @@ func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (e ...@@ -253,8 +262,8 @@ func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (e
} }
}() }()
var req pb.Request var req pb.Request
if err := r.ReadMsg(&req); err != nil { if err := r.ReadMsgWithContext(ctx, &req); err != nil {
return fmt.Errorf("read request: %w peer %s", err, p.Address.String()) return fmt.Errorf("read request: %w", err)
} }
span, _, ctx := s.tracer.StartSpanFromContext(ctx, "handle-retrieve-chunk", s.logger, opentracing.Tag{Key: "address", Value: swarm.NewAddress(req.Addr).String()}) span, _, ctx := s.tracer.StartSpanFromContext(ctx, "handle-retrieve-chunk", s.logger, opentracing.Tag{Key: "address", Value: swarm.NewAddress(req.Addr).String()})
defer span.Finish() defer span.Finish()
...@@ -277,7 +286,7 @@ func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (e ...@@ -277,7 +286,7 @@ func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (e
if err := w.WriteMsgWithContext(ctx, &pb.Delivery{ if err := w.WriteMsgWithContext(ctx, &pb.Delivery{
Data: chunk.Data(), Data: chunk.Data(),
}); err != nil { }); err != nil {
return fmt.Errorf("write delivery: %w peer %s", err, p.Address.String()) return fmt.Errorf("write delivery: %w", err)
} }
// compute the price we charge for this chunk and debit it from p's balance // compute the price we charge for this chunk and debit it from p's balance
......
...@@ -52,9 +52,9 @@ func TestDelivery(t *testing.T) { ...@@ -52,9 +52,9 @@ func TestDelivery(t *testing.T) {
// create the server that will handle the request and will serve the response // create the server that will handle the request and will serve the response
server := retrieval.New(swarm.MustParseHexAddress("00112234"), mockStorer, nil, nil, logger, serverMockAccounting, pricerMock, mockValidator, nil) server := retrieval.New(swarm.MustParseHexAddress("00112234"), mockStorer, nil, nil, logger, serverMockAccounting, pricerMock, mockValidator, nil)
recorder := streamtest.New( recorder := streamtest.NewRecorderDisconnecter(streamtest.New(
streamtest.WithProtocols(server.Protocol()), streamtest.WithProtocols(server.Protocol()),
) ))
clientMockAccounting := accountingmock.NewAccounting() clientMockAccounting := accountingmock.NewAccounting()
...@@ -153,7 +153,7 @@ func TestRetrieveChunk(t *testing.T) { ...@@ -153,7 +153,7 @@ func TestRetrieveChunk(t *testing.T) {
server := retrieval.New(serverAddress, serverStorer, nil, nil, logger, accountingmock.NewAccounting(), pricer, mockValidator, nil) server := retrieval.New(serverAddress, serverStorer, nil, nil, logger, accountingmock.NewAccounting(), pricer, mockValidator, nil)
recorder := streamtest.New(streamtest.WithProtocols(server.Protocol())) recorder := streamtest.NewRecorderDisconnecter(streamtest.New(streamtest.WithProtocols(server.Protocol())))
clientSuggester := mockPeerSuggester{eachPeerRevFunc: func(f topology.EachPeerFunc) error { clientSuggester := mockPeerSuggester{eachPeerRevFunc: func(f topology.EachPeerFunc) error {
_, _, _ = f(serverAddress, 0) _, _, _ = f(serverAddress, 0)
...@@ -198,7 +198,7 @@ func TestRetrieveChunk(t *testing.T) { ...@@ -198,7 +198,7 @@ func TestRetrieveChunk(t *testing.T) {
forwarder := retrieval.New( forwarder := retrieval.New(
forwarderAddress, forwarderAddress,
storemock.NewStorer(), // no chunk in forwarder's store storemock.NewStorer(), // no chunk in forwarder's store
streamtest.New(streamtest.WithProtocols(server.Protocol())), // connect to server streamtest.NewRecorderDisconnecter(streamtest.New(streamtest.WithProtocols(server.Protocol()))), // connect to server
mockPeerSuggester{eachPeerRevFunc: func(f topology.EachPeerFunc) error { mockPeerSuggester{eachPeerRevFunc: func(f topology.EachPeerFunc) error {
_, _, _ = f(serverAddress, 0) // suggest server's address _, _, _ = f(serverAddress, 0) // suggest server's address
return nil return nil
...@@ -213,7 +213,7 @@ func TestRetrieveChunk(t *testing.T) { ...@@ -213,7 +213,7 @@ func TestRetrieveChunk(t *testing.T) {
client := retrieval.New( client := retrieval.New(
clientAddress, clientAddress,
storemock.NewStorer(), // no chunk in clients's store storemock.NewStorer(), // no chunk in clients's store
streamtest.New(streamtest.WithProtocols(forwarder.Protocol())), // connect to forwarder streamtest.NewRecorderDisconnecter(streamtest.New(streamtest.WithProtocols(forwarder.Protocol()))), // connect to forwarder
mockPeerSuggester{eachPeerRevFunc: func(f topology.EachPeerFunc) error { mockPeerSuggester{eachPeerRevFunc: func(f topology.EachPeerFunc) error {
_, _, _ = f(forwarderAddress, 0) // suggest forwarder's address _, _, _ = f(forwarderAddress, 0) // suggest forwarder's address
return nil return nil
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment