Commit 7e40e573 authored by acud's avatar acud Committed by GitHub

[bee #48] upload syncs to closest - add SyncPeer method (#97)

* add SyncPeer functionality to topology driver to return closest peer to a chunk when picking a peer to send a chunk to
* Import XOR-Distance functionality
parent 29226cc7
......@@ -164,7 +164,7 @@ func NewBee(o Options) (*Bee, error) {
return nil, fmt.Errorf("hive service: %w", err)
}
topologyDriver := full.New(hive, addressbook, p2ps, logger)
topologyDriver := full.New(hive, addressbook, p2ps, logger, address)
hive.SetPeerAddedHandler(topologyDriver.AddPeer)
p2ps.SetPeerAddedHandler(topologyDriver.AddPeer)
addrs, err := p2ps.Addresses()
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pushsync
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pushsync
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package swarm
import (
"errors"
"math/big"
)
// Distance returns the distance between address x and address y as a (comparable) big integer using the distance metric defined in the swarm specification.
// Fails if not all addresses are of equal length.
func Distance(x, y []byte) (*big.Int, error) {
distanceBytes, err := DistanceRaw(x, y)
if err != nil {
return nil, err
}
r := big.NewInt(0)
r.SetBytes(distanceBytes)
return r, nil
}
// DistanceRaw returns the distance between address x and address y in big-endian binary format using the distance metric defined in the swarm specfication.
// Fails if not all addresses are of equal length.
func DistanceRaw(x, y []byte) ([]byte, error) {
if len(x) != len(y) {
return nil, errors.New("address length must match")
}
c := make([]byte, len(x))
for i, addr := range x {
c[i] = addr ^ y[i]
}
return c, nil
}
// DistanceCmp compares x and y to a in terms of the distance metric defined in the swarm specfication.
// it returns:
// 1 if x is closer to a than y
// 0 if x and y are equally far apart from a (this means that x and y are the same address)
// -1 if x is farther from a than y
// Fails if not all addresses are of equal length.
func DistanceCmp(a, x, y []byte) (int, error) {
if len(a) != len(x) || len(a) != len(y) {
return 0, errors.New("address length must match")
}
for i := range a {
dx := x[i] ^ a[i]
dy := y[i] ^ a[i]
if dx == dy {
continue
} else if dx < dy {
return 1, nil
}
return -1, nil
}
return 0, nil
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package swarm
import (
"testing"
)
type distanceTest struct {
x []byte
y []byte
result string
}
type distanceCmpTest struct {
a []byte
x []byte
y []byte
result int
}
var (
distanceTests = []distanceTest{
{
x: MustParseHexAddress("9100000000000000000000000000000000000000000000000000000000000000").Bytes(),
y: MustParseHexAddress("8200000000000000000000000000000000000000000000000000000000000000").Bytes(),
result: "8593944123082061379093159043613555660984881674403010612303492563087302590464",
},
}
distanceCmpTests = []distanceCmpTest{
{
a: MustParseHexAddress("9100000000000000000000000000000000000000000000000000000000000000").Bytes(), // 10010001
x: MustParseHexAddress("8200000000000000000000000000000000000000000000000000000000000000").Bytes(), // 10000010 xor(0x91,0x82) = 19
y: MustParseHexAddress("1200000000000000000000000000000000000000000000000000000000000000").Bytes(), // 00010010 xor(0x91,0x12) = 131
result: 1,
},
{
a: MustParseHexAddress("9100000000000000000000000000000000000000000000000000000000000000").Bytes(),
x: MustParseHexAddress("1200000000000000000000000000000000000000000000000000000000000000").Bytes(),
y: MustParseHexAddress("8200000000000000000000000000000000000000000000000000000000000000").Bytes(),
result: -1,
},
{
a: MustParseHexAddress("9100000000000000000000000000000000000000000000000000000000000000").Bytes(),
x: MustParseHexAddress("1200000000000000000000000000000000000000000000000000000000000000").Bytes(),
y: MustParseHexAddress("1200000000000000000000000000000000000000000000000000000000000000").Bytes(),
result: 0,
},
}
)
// TestDistance tests the correctness of the distance calculation.
func TestDistance(t *testing.T) {
for _, dt := range distanceTests {
distance, err := Distance(dt.x, dt.y)
if err != nil {
t.Fatal(err)
}
if distance.String() != dt.result {
t.Fatalf("incorrect distance, expected %s, got %s (x: %x, y: %x)", dt.result, distance.String(), dt.x, dt.y)
}
}
}
// TestDistanceCmp tests the distance comparison method.
func TestDistanceCmp(t *testing.T) {
for _, dt := range distanceCmpTests {
direction, err := DistanceCmp(dt.a, dt.x, dt.y)
if err != nil {
t.Fatal(err)
}
if direction != dt.result {
t.Fatalf("incorrect distance compare, expected %d, got %d (a: %x, x: %x, y: %x)", dt.result, direction, dt.a, dt.x, dt.y)
}
}
}
......@@ -24,13 +24,15 @@ func init() {
rand.Seed(time.Now().UnixNano())
}
var _ topology.Driver = (*Driver)(nil)
var _ topology.Driver = (*driver)(nil)
// Driver drives the connectivity between nodes. It is a basic implementation of a connectivity Driver.
// driver drives the connectivity between nodes. It is a basic implementation of a connectivity driver.
// that enabled full connectivity in the sense that:
// - Every peer which is added to the Driver gets broadcasted to every other peer regardless of its address.
// - Every peer which is added to the driver gets broadcasted to every other peer regardless of its address.
// - A random peer is picked when asking for a peer to retrieve an arbitrary chunk (Peerer interface).
type Driver struct {
type driver struct {
base swarm.Address // the base address for this node
discovery discovery.Driver
addressBook addressbook.GetPutter
p2pService p2p.Service
......@@ -39,8 +41,9 @@ type Driver struct {
logger logging.Logger
}
func New(disc discovery.Driver, addressBook addressbook.GetPutter, p2pService p2p.Service, logger logging.Logger) *Driver {
return &Driver{
func New(disc discovery.Driver, addressBook addressbook.GetPutter, p2pService p2p.Service, logger logging.Logger, baseAddress swarm.Address) topology.Driver {
return &driver{
base: baseAddress,
discovery: disc,
addressBook: addressBook,
p2pService: p2pService,
......@@ -52,7 +55,7 @@ func New(disc discovery.Driver, addressBook addressbook.GetPutter, p2pService p2
// AddPeer adds a new peer to the topology driver.
// The peer would be subsequently broadcasted to all connected peers.
// All conneceted peers are also broadcasted to the new peer.
func (d *Driver) AddPeer(ctx context.Context, addr swarm.Address) error {
func (d *driver) AddPeer(ctx context.Context, addr swarm.Address) error {
d.mtx.Lock()
if _, ok := d.receivedPeers[addr.ByteString()]; ok {
d.mtx.Unlock()
......@@ -112,7 +115,7 @@ func (d *Driver) AddPeer(ctx context.Context, addr swarm.Address) error {
}
// ChunkPeer is used to suggest a peer to ask a certain chunk from.
func (d *Driver) ChunkPeer(addr swarm.Address) (peerAddr swarm.Address, err error) {
func (d *driver) ChunkPeer(addr swarm.Address) (peerAddr swarm.Address, err error) {
connectedPeers := d.p2pService.Peers()
if len(connectedPeers) == 0 {
return swarm.Address{}, topology.ErrNotFound
......@@ -130,6 +133,52 @@ func (d *Driver) ChunkPeer(addr swarm.Address) (peerAddr swarm.Address, err erro
return swarm.Address{}, topology.ErrNotFound
}
// SyncPeer returns a peer to which we would like to sync an arbitrary
// chunk address. Returns the closest peer in relation to the chunk.
func (d *driver) SyncPeer(addr swarm.Address) (swarm.Address, error) {
connectedPeers := d.p2pService.Peers()
if len(connectedPeers) == 0 {
return swarm.Address{}, topology.ErrNotFound
}
overlays := make([]swarm.Address, len(connectedPeers))
for i, v := range connectedPeers {
overlays[i] = v.Address
}
return closestPeer(addr, d.base, overlays)
}
// closestPeer returns the closest peer from the supplied peers slice.
// returns topology.ErrWantSelf if the base address is the closest
func closestPeer(addr, self swarm.Address, peers []swarm.Address) (swarm.Address, error) {
// start checking closest from _self_
closest := self
for _, peer := range peers {
dcmp, err := swarm.DistanceCmp(addr.Bytes(), closest.Bytes(), peer.Bytes())
if err != nil {
return swarm.Address{}, err
}
switch dcmp {
case 0:
// do nothing
case -1:
// current peer is closer
closest = peer
case 1:
// closest is already closer to chunk
// do nothing
}
}
// check if self
if closest.Equal(self) {
return swarm.Address{}, topology.ErrWantSelf
}
return closest, nil
}
func isConnected(addr swarm.Address, connectedPeers []p2p.Peer) bool {
for _, p := range connectedPeers {
if p.Address.Equal(addr) {
......
......@@ -51,7 +51,7 @@ func TestAddPeer(t *testing.T) {
return overlay, nil
}))
fullDriver := full.New(discovery, ab, p2p, logger)
fullDriver := full.New(discovery, ab, p2p, logger, overlay)
multiaddr, err := ma.NewMultiaddr(underlay)
if err != nil {
t.Fatal(err)
......@@ -81,7 +81,7 @@ func TestAddPeer(t *testing.T) {
return swarm.Address{}, nil
}))
fullDriver := full.New(discovery, ab, p2p, logger)
fullDriver := full.New(discovery, ab, p2p, logger, overlay)
err := fullDriver.AddPeer(context.Background(), overlay)
if !errors.Is(err, topology.ErrNotFound) {
t.Fatalf("full conn driver returned err %v", err)
......@@ -105,7 +105,7 @@ func TestAddPeer(t *testing.T) {
return connectedPeers
}))
fullDriver := full.New(discovery, ab, p2p, logger)
fullDriver := full.New(discovery, ab, p2p, logger, overlay)
multiaddr, err := ma.NewMultiaddr(underlay)
if err != nil {
t.Fatal("error creating multiaddr")
......@@ -152,7 +152,7 @@ func TestAddPeer(t *testing.T) {
return connectedPeers
}))
fullDriver := full.New(discovery, ab, p2ps, logger)
fullDriver := full.New(discovery, ab, p2ps, logger, overlay)
multiaddr, err := ma.NewMultiaddr(underlay)
if err != nil {
t.Fatal(err)
......@@ -186,6 +186,83 @@ func TestAddPeer(t *testing.T) {
})
}
// TestSyncPeer tests that SyncPeer method returns closest connected peer to a given chunk.
func TestSyncPeer(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
baseOverlay := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") // base is 0000
connectedPeers := []p2p.Peer{
{
Address: swarm.MustParseHexAddress("8000000000000000000000000000000000000000000000000000000000000000"), // binary 1000 -> po 0 to base
},
{
Address: swarm.MustParseHexAddress("4000000000000000000000000000000000000000000000000000000000000000"), // binary 0100 -> po 1 to base
},
{
Address: swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000"), // binary 0110 -> po 1 to base
},
}
discovery := mock.NewDiscovery()
statestore := mockstate.NewStateStore()
ab := addressbook.New(statestore)
p2ps := p2pmock.New(p2pmock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (swarm.Address, error) {
return baseOverlay, nil
}), p2pmock.WithPeersFunc(func() []p2p.Peer {
return connectedPeers
}))
fullDriver := full.New(discovery, ab, p2ps, logger, baseOverlay)
for _, tc := range []struct {
chunkAddress swarm.Address // chunk address to test
expectedPeer int // points to the index of the connectedPeers slice. -1 means self (baseOverlay)
}{
{
chunkAddress: swarm.MustParseHexAddress("7000000000000000000000000000000000000000000000000000000000000000"), // 0111, wants peer 2
expectedPeer: 2,
},
{
chunkAddress: swarm.MustParseHexAddress("c000000000000000000000000000000000000000000000000000000000000000"), // 1100, want peer 0
expectedPeer: 0,
},
{
chunkAddress: swarm.MustParseHexAddress("e000000000000000000000000000000000000000000000000000000000000000"), // 1110, want peer 0
expectedPeer: 0,
},
{
chunkAddress: swarm.MustParseHexAddress("a000000000000000000000000000000000000000000000000000000000000000"), // 1010, want peer 0
expectedPeer: 0,
},
{
chunkAddress: swarm.MustParseHexAddress("4000000000000000000000000000000000000000000000000000000000000000"), // 0100, want peer 1
expectedPeer: 1,
},
{
chunkAddress: swarm.MustParseHexAddress("5000000000000000000000000000000000000000000000000000000000000000"), // 0101, want peer 1
expectedPeer: 1,
},
{
chunkAddress: swarm.MustParseHexAddress("0000001000000000000000000000000000000000000000000000000000000000"), // want self
expectedPeer: -1,
},
} {
peer, err := fullDriver.SyncPeer(tc.chunkAddress)
if err != nil {
if tc.expectedPeer == -1 && !errors.Is(err, topology.ErrWantSelf) {
t.Fatalf("wanted %v but got %v", topology.ErrWantSelf, err)
}
continue
}
expected := connectedPeers[tc.expectedPeer].Address
if !peer.Equal(expected) {
t.Fatalf("peers not equal. got %s expected %s", peer, expected)
}
}
}
func checkAddreseeRecords(discovery *mock.Discovery, addr swarm.Address, expected []p2p.Peer) error {
got, exists := discovery.AddresseeRecords(addr)
if exists != true {
......@@ -197,6 +274,5 @@ func checkAddreseeRecords(discovery *mock.Discovery, addr swarm.Address, expecte
return fmt.Errorf("addressee record expected %s, got %s ", e.Address.String(), got[i].String())
}
}
return nil
}
......@@ -12,10 +12,12 @@ import (
)
var ErrNotFound = errors.New("no peer found")
var ErrWantSelf = errors.New("node wants self")
type Driver interface {
PeerAdder
ChunkPeerer
SyncPeerer
}
type PeerAdder interface {
......@@ -25,3 +27,7 @@ type PeerAdder interface {
type ChunkPeerer interface {
ChunkPeer(addr swarm.Address) (peerAddr swarm.Address, err error)
}
type SyncPeerer interface {
SyncPeer(addr swarm.Address) (peerAddr swarm.Address, err error)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment