Commit 54f1ca36 authored by acud's avatar acud Committed by GitHub

remove SyncPeerer and ChunkPeerer in favor of ClosestPeerer since...

remove SyncPeerer and ChunkPeerer in favor of ClosestPeerer since functionalities are overlapping (#109)
parent 78483a44
...@@ -25,14 +25,14 @@ const ( ...@@ -25,14 +25,14 @@ const (
type Service struct { type Service struct {
streamer p2p.Streamer streamer p2p.Streamer
peerSuggester topology.ChunkPeerer peerSuggester topology.ClosestPeerer
storer storage.Storer storer storage.Storer
logger logging.Logger logger logging.Logger
} }
type Options struct { type Options struct {
Streamer p2p.Streamer Streamer p2p.Streamer
ChunkPeerer topology.ChunkPeerer ChunkPeerer topology.ClosestPeerer
Storer storage.Storer Storer storage.Storer
Logger logging.Logger Logger logging.Logger
} }
...@@ -63,7 +63,7 @@ func (s *Service) Protocol() p2p.ProtocolSpec { ...@@ -63,7 +63,7 @@ func (s *Service) Protocol() p2p.ProtocolSpec {
} }
func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address) (data []byte, err error) { func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address) (data []byte, err error) {
peerID, err := s.peerSuggester.ChunkPeer(addr) peerID, err := s.peerSuggester.ClosestPeer(addr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -75,7 +75,7 @@ func TestDelivery(t *testing.T) { ...@@ -75,7 +75,7 @@ func TestDelivery(t *testing.T) {
if !bytes.Equal(v, reqData) { if !bytes.Equal(v, reqData) {
t.Fatalf("request and response data not equal. got %s want %s", v, reqData) t.Fatalf("request and response data not equal. got %s want %s", v, reqData)
} }
peerID, _ := ps.ChunkPeer(swarm.ZeroAddress) peerID, _ := ps.ClosestPeer(swarm.ZeroAddress)
records, err := recorder.Records(peerID, "retrieval", "1.0.0", "retrieval") records, err := recorder.Records(peerID, "retrieval", "1.0.0", "retrieval")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -124,6 +124,6 @@ type mockPeerSuggester struct { ...@@ -124,6 +124,6 @@ type mockPeerSuggester struct {
spFunc func(swarm.Address) (swarm.Address, error) spFunc func(swarm.Address) (swarm.Address, error)
} }
func (v mockPeerSuggester) ChunkPeer(addr swarm.Address) (swarm.Address, error) { func (v mockPeerSuggester) ClosestPeer(addr swarm.Address) (swarm.Address, error) {
return v.spFunc(addr) return v.spFunc(addr)
} }
...@@ -114,48 +114,18 @@ func (d *driver) AddPeer(ctx context.Context, addr swarm.Address) error { ...@@ -114,48 +114,18 @@ func (d *driver) AddPeer(ctx context.Context, addr swarm.Address) error {
return nil return nil
} }
// ChunkPeer is used to suggest a peer to ask a certain chunk from. // ClosestPeer returns the closest connected peer we have in relation to a
func (d *driver) ChunkPeer(addr swarm.Address) (peerAddr swarm.Address, err error) { // given chunk address. Returns topology.ErrWantSelf in case base is the closest to the chunk.
func (d *driver) ClosestPeer(addr swarm.Address) (swarm.Address, error) {
connectedPeers := d.p2pService.Peers() connectedPeers := d.p2pService.Peers()
if len(connectedPeers) == 0 { if len(connectedPeers) == 0 {
return swarm.Address{}, topology.ErrNotFound return swarm.Address{}, topology.ErrNotFound
} }
itemIdx := rand.Intn(len(connectedPeers))
i := 0
for _, v := range connectedPeers {
if i == itemIdx {
return v.Address, nil
}
i++
}
return swarm.Address{}, topology.ErrNotFound
}
// SyncPeer returns a peer to which we would like to sync an arbitrary
// chunk address. Returns the closest peer in relation to the chunk.
func (d *driver) SyncPeer(addr swarm.Address) (swarm.Address, error) {
connectedPeers := d.p2pService.Peers()
if len(connectedPeers) == 0 {
return swarm.Address{}, topology.ErrNotFound
}
overlays := make([]swarm.Address, len(connectedPeers))
for i, v := range connectedPeers {
overlays[i] = v.Address
}
return closestPeer(addr, d.base, overlays)
}
// closestPeer returns the closest peer from the supplied peers slice.
// returns topology.ErrWantSelf if the base address is the closest
func closestPeer(addr, self swarm.Address, peers []swarm.Address) (swarm.Address, error) {
// start checking closest from _self_ // start checking closest from _self_
closest := self closest := d.base
for _, peer := range peers { for _, peer := range connectedPeers {
dcmp, err := swarm.DistanceCmp(addr.Bytes(), closest.Bytes(), peer.Bytes()) dcmp, err := swarm.DistanceCmp(addr.Bytes(), closest.Bytes(), peer.Address.Bytes())
if err != nil { if err != nil {
return swarm.Address{}, err return swarm.Address{}, err
} }
...@@ -164,7 +134,7 @@ func closestPeer(addr, self swarm.Address, peers []swarm.Address) (swarm.Address ...@@ -164,7 +134,7 @@ func closestPeer(addr, self swarm.Address, peers []swarm.Address) (swarm.Address
// do nothing // do nothing
case -1: case -1:
// current peer is closer // current peer is closer
closest = peer closest = peer.Address
case 1: case 1:
// closest is already closer to chunk // closest is already closer to chunk
// do nothing // do nothing
...@@ -172,7 +142,7 @@ func closestPeer(addr, self swarm.Address, peers []swarm.Address) (swarm.Address ...@@ -172,7 +142,7 @@ func closestPeer(addr, self swarm.Address, peers []swarm.Address) (swarm.Address
} }
// check if self // check if self
if closest.Equal(self) { if closest.Equal(d.base) {
return swarm.Address{}, topology.ErrWantSelf return swarm.Address{}, topology.ErrWantSelf
} }
......
...@@ -187,7 +187,7 @@ func TestAddPeer(t *testing.T) { ...@@ -187,7 +187,7 @@ func TestAddPeer(t *testing.T) {
} }
// TestSyncPeer tests that SyncPeer method returns closest connected peer to a given chunk. // TestSyncPeer tests that SyncPeer method returns closest connected peer to a given chunk.
func TestSyncPeer(t *testing.T) { func TestClosestPeer(t *testing.T) {
logger := logging.New(ioutil.Discard, 0) logger := logging.New(ioutil.Discard, 0)
baseOverlay := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") // base is 0000 baseOverlay := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") // base is 0000
connectedPeers := []p2p.Peer{ connectedPeers := []p2p.Peer{
...@@ -247,7 +247,7 @@ func TestSyncPeer(t *testing.T) { ...@@ -247,7 +247,7 @@ func TestSyncPeer(t *testing.T) {
expectedPeer: -1, expectedPeer: -1,
}, },
} { } {
peer, err := fullDriver.SyncPeer(tc.chunkAddress) peer, err := fullDriver.ClosestPeer(tc.chunkAddress)
if err != nil { if err != nil {
if tc.expectedPeer == -1 && !errors.Is(err, topology.ErrWantSelf) { if tc.expectedPeer == -1 && !errors.Is(err, topology.ErrWantSelf) {
t.Fatalf("wanted %v but got %v", topology.ErrWantSelf, err) t.Fatalf("wanted %v but got %v", topology.ErrWantSelf, err)
......
...@@ -16,18 +16,13 @@ var ErrWantSelf = errors.New("node wants self") ...@@ -16,18 +16,13 @@ var ErrWantSelf = errors.New("node wants self")
type Driver interface { type Driver interface {
PeerAdder PeerAdder
ChunkPeerer ClosestPeerer
SyncPeerer
} }
type PeerAdder interface { type PeerAdder interface {
AddPeer(ctx context.Context, addr swarm.Address) error AddPeer(ctx context.Context, addr swarm.Address) error
} }
type ChunkPeerer interface { type ClosestPeerer interface {
ChunkPeer(addr swarm.Address) (peerAddr swarm.Address, err error) ClosestPeer(addr swarm.Address) (peerAddr swarm.Address, err error)
}
type SyncPeerer interface {
SyncPeer(addr swarm.Address) (peerAddr swarm.Address, err error)
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment