Commit d1dcc1dd authored by acud's avatar acud Committed by GitHub

topology: remove full connectivity driver (#1153)

parent 06d1cb40
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package full
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
"sync"
"time"
"github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/discovery"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
var _ topology.Driver = (*driver)(nil)
// driver drives the connectivity between nodes. It is a basic implementation of a connectivity driver.
// that enabled full connectivity in the sense that:
// - Every peer which is added to the driver gets broadcasted to every other peer regardless of its address.
// - A random peer is picked when asking for a peer to retrieve an arbitrary chunk (Peerer interface).
type driver struct {
base swarm.Address // the base address for this node
discovery discovery.Driver
addressBook addressbook.Interface
p2pService p2p.Service
receivedPeers map[string]struct{} // track already received peers. Note: implement cleanup or expiration if needed to stop infinite grow
backoffActive bool
logger logging.Logger
mtx sync.Mutex
addPeerCh chan swarm.Address
quit chan struct{}
}
func New(disc discovery.Driver, addressBook addressbook.Interface, p2pService p2p.Service, logger logging.Logger, baseAddress swarm.Address) topology.Driver {
d := &driver{
base: baseAddress,
discovery: disc,
addressBook: addressBook,
p2pService: p2pService,
receivedPeers: make(map[string]struct{}),
logger: logger,
addPeerCh: make(chan swarm.Address, 64),
quit: make(chan struct{}),
}
go d.manage()
return d
}
func (d *driver) manage() {
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-d.quit
cancel()
}()
for {
select {
case <-d.quit:
return
case addr := <-d.addPeerCh:
d.mtx.Lock()
if _, ok := d.receivedPeers[addr.ByteString()]; ok {
d.mtx.Unlock()
return
}
d.receivedPeers[addr.ByteString()] = struct{}{}
d.mtx.Unlock()
connectedPeers := d.p2pService.Peers()
bzzAddress, err := d.addressBook.Get(addr)
if err != nil {
return
}
if !isConnected(addr, connectedPeers) {
_, err := d.p2pService.Connect(ctx, bzzAddress.Underlay)
if err != nil {
d.mtx.Lock()
delete(d.receivedPeers, addr.ByteString())
d.mtx.Unlock()
var e *p2p.ConnectionBackoffError
if errors.As(err, &e) {
d.backoff(e.TryAfter())
}
return
}
}
connectedAddrs := []swarm.Address{}
for _, addressee := range connectedPeers {
// skip newly added peer
if addressee.Address.Equal(addr) {
continue
}
connectedAddrs = append(connectedAddrs, addressee.Address)
if err := d.discovery.BroadcastPeers(ctx, addressee.Address, addr); err != nil {
return
}
}
if len(connectedAddrs) == 0 {
return
}
_ = d.discovery.BroadcastPeers(ctx, addr, connectedAddrs...)
}
}
}
// AddPeers adds a new peer to the topology driver.
// The peer would be subsequently broadcasted to all connected peers.
// All connected peers are also broadcasted to the new peer.
func (d *driver) AddPeers(ctx context.Context, addrs ...swarm.Address) error {
for _, addr := range addrs {
d.addPeerCh <- addr
}
return nil
}
// ClosestPeer returns the closest connected peer we have in relation to a
// given chunk address. Returns topology.ErrWantSelf in case base is the closest to the chunk.
func (d *driver) ClosestPeer(addr swarm.Address, skipPeers ...swarm.Address) (swarm.Address, error) {
connectedPeers := d.p2pService.Peers()
if len(connectedPeers) == 0 {
return swarm.Address{}, topology.ErrNotFound
}
// start checking closest from _self_
closest := d.base
skipPeer := false
for _, peer := range connectedPeers {
if len(skipPeers) > 0 {
for _, a := range skipPeers {
if a.Equal(peer.Address) {
skipPeer = true
break
}
}
if skipPeer {
skipPeer = false
continue
}
}
dcmp, err := swarm.DistanceCmp(addr.Bytes(), closest.Bytes(), peer.Address.Bytes())
if err != nil {
return swarm.Address{}, err
}
switch dcmp {
case 0:
// do nothing
case -1:
// current peer is closer
closest = peer.Address
case 1:
// closest is already closer to chunk
// do nothing
}
}
// check if self
if closest.Equal(d.base) {
return swarm.Address{}, topology.ErrWantSelf
}
return closest, nil
}
func (d *driver) Connected(ctx context.Context, addr swarm.Address) error {
return d.AddPeers(ctx, addr)
}
func (*driver) Disconnected(swarm.Address) {
// TODO: implement if necessary
}
func (*driver) NeighborhoodDepth() uint8 {
return 0
}
// EachPeer iterates from closest bin to farthest
func (*driver) EachPeer(_ topology.EachPeerFunc) error {
panic("not implemented") // TODO: Implement
}
// EachPeerRev iterates from farthest bin to closest
func (*driver) EachPeerRev(_ topology.EachPeerFunc) error {
panic("not implemented") // TODO: Implement
}
func (*driver) SubscribePeersChange() (c <-chan struct{}, unsubscribe func()) {
//TODO implement if necessary
return c, unsubscribe
}
func (d *driver) MarshalJSON() ([]byte, error) {
var peers []string
for p := range d.receivedPeers {
peers = append(peers, p)
}
return json.Marshal(struct {
Peers []string `json:"peers"`
}{Peers: peers})
}
func (d *driver) String() string {
return fmt.Sprintf("%s", d.receivedPeers)
}
func (d *driver) Close() error {
close(d.quit)
return nil
}
func (d *driver) backoff(tryAfter time.Time) {
d.mtx.Lock()
defer d.mtx.Unlock()
if d.backoffActive {
return
}
d.backoffActive = true
done := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
go func() {
defer cancel()
select {
case <-done:
case <-d.quit:
}
}()
go func() {
defer func() { close(done) }()
select {
case <-time.After(time.Until(tryAfter)):
d.mtx.Lock()
d.backoffActive = false
d.mtx.Unlock()
addresses, _ := d.addressBook.Overlays()
for _, addr := range addresses {
select {
case <-d.quit:
return
default:
if err := d.AddPeers(ctx, addr); err != nil {
var e *p2p.ConnectionBackoffError
if errors.As(err, &e) {
d.backoff(e.TryAfter())
return
}
}
}
}
case <-d.quit:
return
}
}()
}
func isConnected(addr swarm.Address, connectedPeers []p2p.Peer) bool {
for _, p := range connectedPeers {
if p.Address.Equal(addr) {
return true
}
}
return false
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package full_test
import (
"context"
"errors"
"fmt"
"io/ioutil"
"testing"
"time"
"github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/bzz"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/discovery/mock"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
p2pmock "github.com/ethersphere/bee/pkg/p2p/mock"
mockstate "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/topology/full"
ma "github.com/multiformats/go-multiaddr"
)
func TestAddPeers(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
underlay, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1634/p2p/16Uiu2HAkx8ULY8cTXhdVAcMmLcH9AsTKz6uBQ7DPLKRjMLgBVYkS")
if err != nil {
t.Fatal(err)
}
overlay := swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59a")
pk, err := crypto.GenerateSecp256k1Key()
if err != nil {
t.Fatal(err)
}
bzzAddr, err := bzz.NewAddress(crypto.NewDefaultSigner(pk), underlay, overlay, 1)
if err != nil {
t.Fatal(err)
}
connectedPeers := []p2p.Peer{
{
Address: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59b"),
},
{
Address: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c"),
},
{
Address: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59d"),
},
}
t.Run("OK - no connected peers", func(t *testing.T) {
discovery := mock.NewDiscovery()
statestore := mockstate.NewStateStore()
ab := addressbook.New(statestore)
p2p := p2pmock.New(p2pmock.WithConnectFunc(func(_ context.Context, addr ma.Multiaddr) (*bzz.Address, error) {
if !addr.Equal(underlay) {
t.Fatalf("expected multiaddr %s, got %s", addr, underlay)
}
return bzzAddr, nil
}))
fullDriver := full.New(discovery, ab, p2p, logger, overlay)
defer fullDriver.Close()
if err := ab.Put(overlay, *bzzAddr); err != nil {
t.Fatal(err)
}
err = fullDriver.AddPeers(context.Background(), overlay)
if err != nil {
t.Fatalf("full conn driver returned err %s", err.Error())
}
expectBroadcastsEventually(t, discovery, 0)
})
t.Run("OK - connected peers - peer already connected", func(t *testing.T) {
discovery := mock.NewDiscovery()
statestore := mockstate.NewStateStore()
ab := addressbook.New(statestore)
alreadyConnected := connectedPeers[0].Address
addrAlreadyConnected, err := bzz.NewAddress(crypto.NewDefaultSigner(pk), underlay, alreadyConnected, 1)
if err != nil {
t.Fatal(err)
}
p2p := p2pmock.New(p2pmock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (*bzz.Address, error) {
t.Fatal("should not be called")
return nil, nil
}), p2pmock.WithPeersFunc(func() []p2p.Peer {
return connectedPeers
}))
fullDriver := full.New(discovery, ab, p2p, logger, overlay)
defer fullDriver.Close()
err = ab.Put(alreadyConnected, *addrAlreadyConnected)
if err != nil {
t.Fatal(err)
}
err = fullDriver.AddPeers(context.Background(), alreadyConnected)
if err != nil {
t.Fatalf("full conn driver returned err %s", err.Error())
}
expectBroadcastsEventually(t, discovery, 3)
// check newly added node
if err := checkAddreseeRecords(discovery, alreadyConnected, connectedPeers[1:]); err != nil {
t.Fatal(err)
}
// check other nodes
for _, p := range connectedPeers[1:] {
if err := checkAddreseeRecords(discovery, p.Address, connectedPeers[0:1]); err != nil {
t.Fatal(err)
}
}
})
t.Run("OK - connected peers - peer not already connected", func(t *testing.T) {
discovery := mock.NewDiscovery()
statestore := mockstate.NewStateStore()
ab := addressbook.New(statestore)
p2ps := p2pmock.New(p2pmock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (*bzz.Address, error) {
if !addr.Equal(underlay) {
t.Fatalf("expected multiaddr %s, got %s", addr.String(), underlay)
}
return bzzAddr, nil
}), p2pmock.WithPeersFunc(func() []p2p.Peer {
return connectedPeers
}))
fullDriver := full.New(discovery, ab, p2ps, logger, overlay)
defer fullDriver.Close()
if err := ab.Put(overlay, *bzzAddr); err != nil {
t.Fatal(err)
}
err = fullDriver.AddPeers(context.Background(), overlay)
if err != nil {
t.Fatalf("full conn driver returned err %s", err.Error())
}
expectBroadcastsEventually(t, discovery, 4)
// check newly added node
if err := checkAddreseeRecords(discovery, overlay, connectedPeers); err != nil {
t.Fatal(err)
}
// check other nodes
for _, p := range connectedPeers {
if err := checkAddreseeRecords(discovery, p.Address, []p2p.Peer{{Address: overlay}}); err != nil {
t.Fatal(err)
}
}
})
}
func expectBroadcastsEventually(t *testing.T, discovery *mock.Discovery, expected int) {
for i := 0; i < 100; i++ {
time.Sleep(50 * time.Millisecond)
if discovery.Broadcasts() == expected {
return
}
}
t.Fatalf("broadcasts expected %v, got %v ", expected, discovery.Broadcasts())
}
// TestSyncPeer tests that SyncPeer method returns closest connected peer to a given chunk.
func TestClosestPeer(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
baseOverlay := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") // base is 0000
bzzAddr := &bzz.Address{
Overlay: baseOverlay,
}
connectedPeers := []p2p.Peer{
{
Address: swarm.MustParseHexAddress("8000000000000000000000000000000000000000000000000000000000000000"), // binary 1000 -> po 0 to base
},
{
Address: swarm.MustParseHexAddress("4000000000000000000000000000000000000000000000000000000000000000"), // binary 0100 -> po 1 to base
},
{
Address: swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000"), // binary 0110 -> po 1 to base
},
}
discovery := mock.NewDiscovery()
statestore := mockstate.NewStateStore()
ab := addressbook.New(statestore)
p2ps := p2pmock.New(p2pmock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (*bzz.Address, error) {
return bzzAddr, nil
}), p2pmock.WithPeersFunc(func() []p2p.Peer {
return connectedPeers
}))
fullDriver := full.New(discovery, ab, p2ps, logger, baseOverlay)
defer fullDriver.Close()
for _, tc := range []struct {
chunkAddress swarm.Address // chunk address to test
expectedPeer int // points to the index of the connectedPeers slice. -1 means self (baseOverlay)
}{
{
chunkAddress: swarm.MustParseHexAddress("7000000000000000000000000000000000000000000000000000000000000000"), // 0111, wants peer 2
expectedPeer: 2,
},
{
chunkAddress: swarm.MustParseHexAddress("c000000000000000000000000000000000000000000000000000000000000000"), // 1100, want peer 0
expectedPeer: 0,
},
{
chunkAddress: swarm.MustParseHexAddress("e000000000000000000000000000000000000000000000000000000000000000"), // 1110, want peer 0
expectedPeer: 0,
},
{
chunkAddress: swarm.MustParseHexAddress("a000000000000000000000000000000000000000000000000000000000000000"), // 1010, want peer 0
expectedPeer: 0,
},
{
chunkAddress: swarm.MustParseHexAddress("4000000000000000000000000000000000000000000000000000000000000000"), // 0100, want peer 1
expectedPeer: 1,
},
{
chunkAddress: swarm.MustParseHexAddress("5000000000000000000000000000000000000000000000000000000000000000"), // 0101, want peer 1
expectedPeer: 1,
},
{
chunkAddress: swarm.MustParseHexAddress("0000001000000000000000000000000000000000000000000000000000000000"), // want self
expectedPeer: -1,
},
} {
peer, err := fullDriver.ClosestPeer(tc.chunkAddress)
if err != nil {
if tc.expectedPeer == -1 && !errors.Is(err, topology.ErrWantSelf) {
t.Fatalf("wanted %v but got %v", topology.ErrWantSelf, err)
}
continue
}
expected := connectedPeers[tc.expectedPeer].Address
if !peer.Equal(expected) {
t.Fatalf("peers not equal. got %s expected %s", peer, expected)
}
}
}
func checkAddreseeRecords(discovery *mock.Discovery, addr swarm.Address, expected []p2p.Peer) error {
got, exists := discovery.AddresseeRecords(addr)
if exists != true {
return errors.New("addressee record does not exist")
}
for i, e := range expected {
if !e.Address.Equal(got[i]) {
return fmt.Errorf("addressee record expected %s, got %s ", e.Address.String(), got[i].String())
}
}
return nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment