Commit 2f565f9f authored by Janoš Guljaš's avatar Janoš Guljaš Committed by GitHub

suggest only downstream peers in retrieval forwarding (#705)

parent f7088db1
...@@ -241,7 +241,7 @@ func NewBee(addr string, swarmAddress swarm.Address, keystore keystore.Service, ...@@ -241,7 +241,7 @@ func NewBee(addr string, swarmAddress swarm.Address, keystore keystore.Service,
chunkvalidator := swarm.NewChunkValidator(soc.NewValidator(), content.NewValidator()) chunkvalidator := swarm.NewChunkValidator(soc.NewValidator(), content.NewValidator())
retrieve := retrieval.New(p2ps, kad, logger, acc, accounting.NewFixedPricer(swarmAddress, 10), chunkvalidator) retrieve := retrieval.New(swarmAddress, p2ps, kad, logger, acc, accounting.NewFixedPricer(swarmAddress, 10), chunkvalidator)
tagg := tags.NewTags(stateStore, logger) tagg := tags.NewTags(stateStore, logger)
b.tagsCloser = tagg b.tagsCloser = tagg
......
...@@ -260,12 +260,12 @@ func newTestNetStore(t *testing.T, recoveryFunc recovery.RecoveryHook) storage.S ...@@ -260,12 +260,12 @@ func newTestNetStore(t *testing.T, recoveryFunc recovery.RecoveryHook) storage.S
_, _, _ = f(peerID, 0) _, _, _ = f(peerID, 0)
return nil return nil
}} }}
server := retrieval.New(nil, nil, logger, serverMockAccounting, nil, nil) server := retrieval.New(swarm.ZeroAddress, nil, nil, logger, serverMockAccounting, nil, nil)
server.SetStorer(mockStorer) server.SetStorer(mockStorer)
recorder := streamtest.New( recorder := streamtest.New(
streamtest.WithProtocols(server.Protocol()), streamtest.WithProtocols(server.Protocol()),
) )
retrieve := retrieval.New(recorder, ps, logger, serverMockAccounting, pricerMock, nil) retrieve := retrieval.New(swarm.ZeroAddress, recorder, ps, logger, serverMockAccounting, pricerMock, nil)
retrieve.SetStorer(mockStorer) retrieve.SetStorer(mockStorer)
ns := netstore.New(storer, recoveryFunc, retrieve, logger, nil) ns := netstore.New(storer, recoveryFunc, retrieve, logger, nil)
return ns return ns
......
...@@ -35,6 +35,7 @@ type Interface interface { ...@@ -35,6 +35,7 @@ type Interface interface {
} }
type Service struct { type Service struct {
addr swarm.Address
streamer p2p.Streamer streamer p2p.Streamer
peerSuggester topology.EachPeerer peerSuggester topology.EachPeerer
storer storage.Storer storer storage.Storer
...@@ -45,8 +46,9 @@ type Service struct { ...@@ -45,8 +46,9 @@ type Service struct {
validator swarm.Validator validator swarm.Validator
} }
func New(streamer p2p.Streamer, chunkPeerer topology.EachPeerer, logger logging.Logger, accounting accounting.Interface, pricer accounting.Pricer, validator swarm.Validator) *Service { func New(addr swarm.Address, streamer p2p.Streamer, chunkPeerer topology.EachPeerer, logger logging.Logger, accounting accounting.Interface, pricer accounting.Pricer, validator swarm.Validator) *Service {
return &Service{ return &Service{
addr: addr,
streamer: streamer, streamer: streamer,
peerSuggester: chunkPeerer, peerSuggester: chunkPeerer,
logger: logger, logger: logger,
...@@ -205,6 +207,14 @@ func (s *Service) closestPeer(addr swarm.Address, skipPeers []swarm.Address) (sw ...@@ -205,6 +207,14 @@ func (s *Service) closestPeer(addr swarm.Address, skipPeers []swarm.Address) (sw
return swarm.Address{}, topology.ErrNotFound return swarm.Address{}, topology.ErrNotFound
} }
dcmp, err := swarm.DistanceCmp(addr.Bytes(), closest.Bytes(), s.addr.Bytes())
if err != nil {
return swarm.Address{}, fmt.Errorf("distance compare addr %s closest %s base address %s: %w", addr.String(), closest.String(), s.addr.String(), err)
}
if dcmp != 1 {
return swarm.Address{}, topology.ErrNotFound
}
return closest, nil return closest, nil
} }
......
...@@ -51,7 +51,7 @@ func TestDelivery(t *testing.T) { ...@@ -51,7 +51,7 @@ func TestDelivery(t *testing.T) {
pricerMock := accountingmock.NewPricer(price, price) pricerMock := accountingmock.NewPricer(price, price)
// create the server that will handle the request and will serve the response // create the server that will handle the request and will serve the response
server := retrieval.New(nil, nil, logger, serverMockAccounting, pricerMock, mockValidator) server := retrieval.New(swarm.MustParseHexAddress("00112234"), nil, nil, logger, serverMockAccounting, pricerMock, mockValidator)
server.SetStorer(mockStorer) server.SetStorer(mockStorer)
recorder := streamtest.New( recorder := streamtest.New(
streamtest.WithProtocols(server.Protocol()), streamtest.WithProtocols(server.Protocol()),
...@@ -70,7 +70,7 @@ func TestDelivery(t *testing.T) { ...@@ -70,7 +70,7 @@ func TestDelivery(t *testing.T) {
_, _, _ = f(peerID, 0) _, _, _ = f(peerID, 0)
return nil return nil
}} }}
client := retrieval.New(recorder, ps, logger, clientMockAccounting, pricerMock, mockValidator) client := retrieval.New(swarm.MustParseHexAddress("9ee7add8"), recorder, ps, logger, clientMockAccounting, pricerMock, mockValidator)
client.SetStorer(clientMockStorer) client.SetStorer(clientMockStorer)
ctx, cancel := context.WithTimeout(context.Background(), testTimeout) ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel() defer cancel()
...@@ -134,6 +134,69 @@ func TestDelivery(t *testing.T) { ...@@ -134,6 +134,69 @@ func TestDelivery(t *testing.T) {
} }
} }
func TestRetrieveChunk(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
mockValidator := swarm.NewChunkValidator(mock.NewValidator(true))
pricer := accountingmock.NewPricer(1, 1)
// requesting a chunk from downstream peer is expected
t.Run("downstream", func(t *testing.T) {
serverAddress := swarm.MustParseHexAddress("03")
chunkAddress := swarm.MustParseHexAddress("02")
clientAddress := swarm.MustParseHexAddress("01")
serverStorer := storemock.NewStorer()
chunk := swarm.NewChunk(chunkAddress, []byte("some data"))
_, err := serverStorer.Put(context.Background(), storage.ModePutUpload, chunk)
if err != nil {
t.Fatal(err)
}
server := retrieval.New(serverAddress, nil, nil, logger, accountingmock.NewAccounting(), pricer, mockValidator)
server.SetStorer(serverStorer)
recorder := streamtest.New(streamtest.WithProtocols(server.Protocol()))
clientSuggester := mockPeerSuggester{eachPeerRevFunc: func(f topology.EachPeerFunc) error {
_, _, _ = f(serverAddress, 0)
return nil
}}
client := retrieval.New(clientAddress, recorder, clientSuggester, logger, accountingmock.NewAccounting(), pricer, mockValidator)
got, err := client.RetrieveChunk(context.Background(), chunkAddress)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(got.Data(), chunk.Data()) {
t.Fatalf("got data %x, want %x", got.Data(), chunk.Data())
}
})
// requesting a chunk from the upstream peer should not be possible to avoid request forwarding loops
t.Run("upstream", func(t *testing.T) {
serverAddress := swarm.MustParseHexAddress("01")
chunkAddress := swarm.MustParseHexAddress("02")
clientAddress := swarm.MustParseHexAddress("03")
server := retrieval.New(serverAddress, nil, nil, logger, accountingmock.NewAccounting(), pricer, mockValidator)
recorder := streamtest.New(streamtest.WithProtocols(server.Protocol()))
clientSuggester := mockPeerSuggester{eachPeerRevFunc: func(f topology.EachPeerFunc) error {
_, _, _ = f(serverAddress, 0)
return nil
}}
client := retrieval.New(clientAddress, recorder, clientSuggester, logger, accountingmock.NewAccounting(), pricer, mockValidator)
// do not request from the upstream peer
_, err := client.RetrieveChunk(context.Background(), chunkAddress)
if !errors.Is(err, topology.ErrNotFound) {
t.Fatalf("got error %v, want %v", err, topology.ErrNotFound)
}
})
}
type mockPeerSuggester struct { type mockPeerSuggester struct {
eachPeerRevFunc func(f topology.EachPeerFunc) error eachPeerRevFunc func(f topology.EachPeerFunc) error
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment