Commit 25d9b009 authored by Janoš Guljaš's avatar Janoš Guljaš Committed by GitHub

Revert "blocklist peer on retrieval timeout (#685)" (#692)

This reverts commit c9bb6556.
parent c9bb6556
......@@ -238,7 +238,7 @@ func NewBee(addr string, swarmAddress swarm.Address, keystore keystore.Service,
chunkvalidator := swarm.NewChunkValidator(soc.NewValidator(), content.NewValidator())
retrieve := retrieval.New(p2ps, storer, kad, logger, acc, accounting.NewFixedPricer(swarmAddress, 10), chunkvalidator)
retrieve := retrieval.New(p2ps, kad, logger, acc, accounting.NewFixedPricer(swarmAddress, 10), chunkvalidator)
tagg := tags.NewTags(stateStore, logger)
b.tagsCloser = tagg
......@@ -257,6 +257,7 @@ func NewBee(addr string, swarmAddress swarm.Address, keystore keystore.Service,
} else {
ns = netstore.New(storer, nil, retrieve, logger, chunkvalidator)
}
retrieve.SetStorer(ns)
pushSyncProtocol := pushsync.New(p2ps, storer, kad, tagg, psss.TryUnwrap, logger, acc, accounting.NewFixedPricer(swarmAddress, 10))
......
......@@ -20,17 +20,13 @@ type Service interface {
AddProtocol(ProtocolSpec) error
// Connect to a peer but do not notify topology about the established connection.
Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.Address, err error)
Disconnecter
Peers() []Peer
AddNotifier(topology.Notifier)
Addresses() ([]ma.Multiaddr, error)
}
type Disconnecter interface {
Disconnect(overlay swarm.Address) error
// Blocklist will disconnect a peer and put it on a blocklist (blocking in & out connections) for provided duration
// duration 0 is treated as an infinite duration
Blocklist(overlay swarm.Address, duration time.Duration) error
Peers() []Peer
AddNotifier(topology.Notifier)
Addresses() ([]ma.Multiaddr, error)
}
// DebugService extends the Service with method used for debugging.
......@@ -45,11 +41,6 @@ type Streamer interface {
NewStream(ctx context.Context, address swarm.Address, h Headers, protocol, version, stream string) (Stream, error)
}
type StreamerDisconnecter interface {
Streamer
Disconnecter
}
// Stream represent a bidirectional data Stream.
type Stream interface {
io.ReadWriter
......
......@@ -319,52 +319,3 @@ type Option interface {
type optionFunc func(*Recorder)
func (f optionFunc) apply(r *Recorder) { f(r) }
var _ p2p.StreamerDisconnecter = (*RecorderDisconnecter)(nil)
type RecorderDisconnecter struct {
*Recorder
disconnected map[string]struct{}
blocklisted map[string]time.Duration
mu sync.RWMutex
}
func NewRecorderDisconnecter(r *Recorder) *RecorderDisconnecter {
return &RecorderDisconnecter{
Recorder: r,
disconnected: make(map[string]struct{}),
blocklisted: make(map[string]time.Duration),
}
}
func (r *RecorderDisconnecter) Disconnect(overlay swarm.Address) error {
r.mu.Lock()
defer r.mu.Unlock()
r.disconnected[overlay.String()] = struct{}{}
return nil
}
func (r *RecorderDisconnecter) Blocklist(overlay swarm.Address, d time.Duration) error {
r.mu.Lock()
defer r.mu.Unlock()
r.blocklisted[overlay.String()] = d
return nil
}
func (r *RecorderDisconnecter) IsDisconnected(overlay swarm.Address) bool {
r.mu.RLock()
defer r.mu.RUnlock()
_, yes := r.disconnected[overlay.String()]
return yes
}
func (r *RecorderDisconnecter) IsBlocklisted(overlay swarm.Address) (bool, time.Duration) {
r.mu.RLock()
defer r.mu.RUnlock()
d, yes := r.blocklisted[overlay.String()]
return yes, d
}
......@@ -269,11 +269,13 @@ func newTestNetStore(t *testing.T, recoveryFunc recovery.RecoveryHook) storage.S
_, _, _ = f(peerID, 0)
return nil
}}
server := retrieval.New(nil, mockStorer, ps, logger, serverMockAccounting, nil, nil)
recorder := streamtest.NewRecorderDisconnecter(streamtest.New(
server := retrieval.New(nil, nil, logger, serverMockAccounting, nil, nil)
server.SetStorer(mockStorer)
recorder := streamtest.New(
streamtest.WithProtocols(server.Protocol()),
))
retrieve := retrieval.New(recorder, mockStorer, ps, logger, serverMockAccounting, pricerMock, nil)
)
retrieve := retrieval.New(recorder, ps, logger, serverMockAccounting, pricerMock, nil)
retrieve.SetStorer(mockStorer)
ns := netstore.New(storer, recoveryFunc, retrieve, logger, nil)
return ns
}
......
......@@ -6,7 +6,6 @@ package retrieval
import (
"context"
"errors"
"fmt"
"time"
......@@ -36,7 +35,7 @@ type Interface interface {
}
type Service struct {
streamer p2p.StreamerDisconnecter
streamer p2p.Streamer
peerSuggester topology.EachPeerer
storer storage.Storer
singleflight singleflight.Group
......@@ -46,11 +45,10 @@ type Service struct {
validator swarm.Validator
}
func New(streamer p2p.StreamerDisconnecter, storer storage.Storer, chunkPeerer topology.EachPeerer, logger logging.Logger, accounting accounting.Interface, pricer accounting.Pricer, validator swarm.Validator) *Service {
func New(streamer p2p.Streamer, chunkPeerer topology.EachPeerer, logger logging.Logger, accounting accounting.Interface, pricer accounting.Pricer, validator swarm.Validator) *Service {
return &Service{
streamer: streamer,
peerSuggester: chunkPeerer,
storer: storer,
logger: logger,
accounting: accounting,
pricer: pricer,
......@@ -74,10 +72,11 @@ func (s *Service) Protocol() p2p.ProtocolSpec {
const (
maxPeers = 5
retrieveChunkTimeout = 10 * time.Second
blocklistDuration = time.Minute
)
func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address) (swarm.Chunk, error) {
ctx, cancel := context.WithTimeout(ctx, maxPeers*retrieveChunkTimeout)
defer cancel()
v, err, _ := s.singleflight.Do(addr.String(), func() (interface{}, error) {
var skipPeers []swarm.Address
......@@ -90,14 +89,6 @@ func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address) (swarm.
}
s.logger.Debugf("retrieval: failed to get chunk %s from peer %s: %v", addr, peer, err)
skipPeers = append(skipPeers, peer)
if errors.Is(err, context.DeadlineExceeded) {
if err := s.streamer.Blocklist(peer, blocklistDuration); err != nil {
s.logger.Errorf("retrieval: unable to block peer %s", peer)
s.logger.Debugf("retrieval: blocking peer %s: %v", peer, err)
} else {
s.logger.Warningf("retrieval: peer %s blocked as unresponsive", peer)
}
}
continue
}
s.logger.Tracef("retrieval: got chunk %s from peer %s", addr, peer)
......@@ -133,7 +124,7 @@ func (s *Service) retrieveChunk(ctx context.Context, addr swarm.Address, skipPee
chunkPrice := s.pricer.PeerPrice(peer, addr)
err = s.accounting.Reserve(peer, chunkPrice)
if err != nil {
return nil, peer, fmt.Errorf("accounting retrieve: %w", err)
return nil, peer, err
}
defer s.accounting.Release(peer, chunkPrice)
......@@ -155,26 +146,26 @@ func (s *Service) retrieveChunk(ctx context.Context, addr swarm.Address, skipPee
if err := w.WriteMsgWithContext(ctx, &pb.Request{
Addr: addr.Bytes(),
}); err != nil {
return nil, peer, fmt.Errorf("write request: %w", err)
return nil, peer, fmt.Errorf("write request: %w peer %s", err, peer.String())
}
var d pb.Delivery
if err := r.ReadMsgWithContext(ctx, &d); err != nil {
return nil, peer, fmt.Errorf("read delivery: %w", err)
return nil, peer, fmt.Errorf("read delivery: %w peer %s", err, peer.String())
}
// credit the peer after successful delivery
chunk = swarm.NewChunk(addr, d.Data)
if !s.validator.Validate(chunk) {
return nil, peer, fmt.Errorf("new chunk: %w", err)
return nil, peer, err
}
err = s.accounting.Credit(peer, chunkPrice)
if err != nil {
return nil, peer, fmt.Errorf("accounting credit: %w", err)
return nil, peer, err
}
return chunk, peer, nil
return chunk, peer, err
}
func (s *Service) closestPeer(addr swarm.Address, skipPeers []swarm.Address) (swarm.Address, error) {
......@@ -228,27 +219,18 @@ func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (e
}()
var req pb.Request
if err := r.ReadMsg(&req); err != nil {
return fmt.Errorf("read request: %w", err)
return fmt.Errorf("read request: %w peer %s", err, p.Address.String())
}
ctx = context.WithValue(ctx, requestSourceContextKey{}, p.Address.String())
addr := swarm.NewAddress(req.Addr)
chunk, err := s.storer.Get(ctx, storage.ModeGetRequest, addr)
chunk, err := s.storer.Get(ctx, storage.ModeGetRequest, swarm.NewAddress(req.Addr))
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
// forward the request
chunk, err = s.RetrieveChunk(ctx, addr)
if err != nil {
return fmt.Errorf("retrieve chunk: %w", err)
}
} else {
return fmt.Errorf("get from store: %w", err)
}
return fmt.Errorf("get from store: %w peer %s", err, p.Address.String())
}
if err := w.WriteMsgWithContext(ctx, &pb.Delivery{
Data: chunk.Data(),
}); err != nil {
return fmt.Errorf("write delivery: %w", err)
return fmt.Errorf("write delivery: %w peer %s", err, p.Address.String())
}
// compute the price we charge for this chunk and debit it from p's balance
......@@ -260,3 +242,8 @@ func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (e
return nil
}
// SetStorer sets the storer. This call is not goroutine safe.
func (s *Service) SetStorer(storer storage.Storer) {
s.storer = storer
}
......@@ -51,10 +51,11 @@ func TestDelivery(t *testing.T) {
pricerMock := accountingmock.NewPricer(price, price)
// create the server that will handle the request and will serve the response
server := retrieval.New(nil, mockStorer, nil, logger, serverMockAccounting, pricerMock, mockValidator)
recorder := streamtest.NewRecorderDisconnecter(streamtest.New(
server := retrieval.New(nil, nil, logger, serverMockAccounting, pricerMock, mockValidator)
server.SetStorer(mockStorer)
recorder := streamtest.New(
streamtest.WithProtocols(server.Protocol()),
))
)
clientMockAccounting := accountingmock.NewAccounting()
......@@ -69,7 +70,8 @@ func TestDelivery(t *testing.T) {
_, _, _ = f(peerID, 0)
return nil
}}
client := retrieval.New(recorder, clientMockStorer, ps, logger, clientMockAccounting, pricerMock, mockValidator)
client := retrieval.New(recorder, ps, logger, clientMockAccounting, pricerMock, mockValidator)
client.SetStorer(clientMockStorer)
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
v, err := client.RetrieveChunk(ctx, reqAddr)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment