Commit 78848e99 authored by acud's avatar acud Committed by GitHub

Revert "blocklist peer on retrieval timeout (#826)" (#1009)

* Revert "blocklist peer on retrieval timeout (#826)"

This reverts commit 6c435112.
parent b7796c6d
......@@ -226,9 +226,9 @@ func newTestNetStore(t *testing.T, recoveryFunc recovery.Callback) storage.Store
return nil
}}
server := retrieval.New(swarm.ZeroAddress, mockStorer, nil, ps, logger, serverMockAccounting, nil, nil, nil)
recorder := streamtest.NewRecorderDisconnecter(streamtest.New(
recorder := streamtest.New(
streamtest.WithProtocols(server.Protocol()),
))
)
retrieve := retrieval.New(swarm.ZeroAddress, mockStorer, recorder, ps, logger, serverMockAccounting, pricerMock, nil, nil)
ns := netstore.New(storer, recoveryFunc, retrieve, logger)
return ns
......
......@@ -39,7 +39,7 @@ type Interface interface {
type Service struct {
addr swarm.Address
streamer p2p.StreamerDisconnecter
streamer p2p.Streamer
peerSuggester topology.EachPeerer
storer storage.Storer
singleflight singleflight.Group
......@@ -50,7 +50,7 @@ type Service struct {
tracer *tracing.Tracer
}
func New(addr swarm.Address, storer storage.Storer, streamer p2p.StreamerDisconnecter, chunkPeerer topology.EachPeerer, logger logging.Logger, accounting accounting.Interface, pricer accounting.Pricer, validator swarm.Validator, tracer *tracing.Tracer) *Service {
func New(addr swarm.Address, storer storage.Storer, streamer p2p.Streamer, chunkPeerer topology.EachPeerer, logger logging.Logger, accounting accounting.Interface, pricer accounting.Pricer, validator swarm.Validator, tracer *tracing.Tracer) *Service {
return &Service{
addr: addr,
streamer: streamer,
......@@ -80,7 +80,6 @@ func (s *Service) Protocol() p2p.ProtocolSpec {
const (
maxPeers = 5
retrieveChunkTimeout = 10 * time.Second
blocklistDuration = time.Minute
)
func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address) (swarm.Chunk, error) {
......@@ -110,14 +109,6 @@ func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address) (swarm.
}
logger.Debugf("retrieval: failed to get chunk %s from peer %s: %v", addr, peer, err)
skipPeers = append(skipPeers, peer)
if errors.Is(err, context.DeadlineExceeded) {
if err := s.streamer.Blocklist(peer, blocklistDuration); err != nil {
s.logger.Errorf("retrieval: unable to block peer %s", peer)
s.logger.Debugf("retrieval: blocking peer %s: %v", peer, err)
} else {
s.logger.Warningf("retrieval: peer %s blocked as unresponsive", peer)
}
}
continue
}
logger.Tracef("retrieval: got chunk %s from peer %s", addr, peer)
......@@ -272,7 +263,7 @@ func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (e
}()
var req pb.Request
if err := r.ReadMsgWithContext(ctx, &req); err != nil {
return fmt.Errorf("read request: %w", err)
return fmt.Errorf("read request: %w peer %s", err, p.Address.String())
}
span, _, ctx := s.tracer.StartSpanFromContext(ctx, "handle-retrieve-chunk", s.logger, opentracing.Tag{Key: "address", Value: swarm.NewAddress(req.Addr).String()})
defer span.Finish()
......@@ -295,7 +286,7 @@ func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (e
if err := w.WriteMsgWithContext(ctx, &pb.Delivery{
Data: chunk.Data(),
}); err != nil {
return fmt.Errorf("write delivery: %w", err)
return fmt.Errorf("write delivery: %w peer %s", err, p.Address.String())
}
s.logger.Tracef("retrieval protocol debiting peer %s", p.Address.String())
......
......@@ -52,9 +52,9 @@ func TestDelivery(t *testing.T) {
// create the server that will handle the request and will serve the response
server := retrieval.New(swarm.MustParseHexAddress("00112234"), mockStorer, nil, nil, logger, serverMockAccounting, pricerMock, mockValidator, nil)
recorder := streamtest.NewRecorderDisconnecter(streamtest.New(
recorder := streamtest.New(
streamtest.WithProtocols(server.Protocol()),
))
)
clientMockAccounting := accountingmock.NewAccounting()
......@@ -153,7 +153,7 @@ func TestRetrieveChunk(t *testing.T) {
server := retrieval.New(serverAddress, serverStorer, nil, nil, logger, accountingmock.NewAccounting(), pricer, mockValidator, nil)
recorder := streamtest.NewRecorderDisconnecter(streamtest.New(streamtest.WithProtocols(server.Protocol())))
recorder := streamtest.New(streamtest.WithProtocols(server.Protocol()))
clientSuggester := mockPeerSuggester{eachPeerRevFunc: func(f topology.EachPeerFunc) error {
_, _, _ = f(serverAddress, 0)
......@@ -198,7 +198,7 @@ func TestRetrieveChunk(t *testing.T) {
forwarder := retrieval.New(
forwarderAddress,
storemock.NewStorer(), // no chunk in forwarder's store
streamtest.NewRecorderDisconnecter(streamtest.New(streamtest.WithProtocols(server.Protocol()))), // connect to server
streamtest.New(streamtest.WithProtocols(server.Protocol())), // connect to server
mockPeerSuggester{eachPeerRevFunc: func(f topology.EachPeerFunc) error {
_, _, _ = f(serverAddress, 0) // suggest server's address
return nil
......@@ -213,7 +213,7 @@ func TestRetrieveChunk(t *testing.T) {
client := retrieval.New(
clientAddress,
storemock.NewStorer(), // no chunk in clients's store
streamtest.NewRecorderDisconnecter(streamtest.New(streamtest.WithProtocols(forwarder.Protocol()))), // connect to forwarder
streamtest.New(streamtest.WithProtocols(forwarder.Protocol())), // connect to forwarder
mockPeerSuggester{eachPeerRevFunc: func(f topology.EachPeerFunc) error {
_, _, _ = f(forwarderAddress, 0) // suggest forwarder's address
return nil
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment