Commit b5312204 authored by acud's avatar acud Committed by GitHub

full connectivity driver (#14)

* topology, discovery, addressbook: add initial interfaces and mocks for a full connectivity topology
parent 285dbed7
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package addressbook
import (
"github.com/ethersphere/bee/pkg/swarm"
ma "github.com/multiformats/go-multiaddr"
)
type GetterPutter interface {
Getter
Putter
}
type Getter interface {
Get(overlay swarm.Address) (addr ma.Multiaddr, exists bool)
}
type Putter interface {
Put(overlay swarm.Address, addr ma.Multiaddr) (exists bool)
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package inmem
import (
"sync"
"github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/swarm"
ma "github.com/multiformats/go-multiaddr"
)
type inmem struct {
mtx sync.Mutex
entries map[string]peerEntry // key: overlay in string value, value: peerEntry
}
type peerEntry struct {
overlay swarm.Address
multiaddr ma.Multiaddr
}
func New() addressbook.GetterPutter {
return &inmem{
entries: make(map[string]peerEntry),
}
}
func (i *inmem) Get(overlay swarm.Address) (addr ma.Multiaddr, exists bool) {
i.mtx.Lock()
defer i.mtx.Unlock()
val, exists := i.entries[overlay.String()]
return val.multiaddr, exists
}
func (i *inmem) Put(overlay swarm.Address, addr ma.Multiaddr) (exists bool) {
i.mtx.Lock()
defer i.mtx.Unlock()
_, e := i.entries[overlay.String()]
i.entries[overlay.String()] = peerEntry{overlay: overlay, multiaddr: addr}
return e
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package inmem
import (
"testing"
"github.com/ethersphere/bee/pkg/swarm"
ma "github.com/multiformats/go-multiaddr"
)
func TestInMemStore(t *testing.T) {
mem := New()
addr1 := swarm.NewAddress([]byte{0, 1, 2, 3})
addr2 := swarm.NewAddress([]byte{0, 1, 2, 4})
multiaddr, err := ma.NewMultiaddr("/ip4/1.1.1.1")
if err != nil {
t.Fatal(err)
}
//var beep ma.Multiaddr
exists := mem.Put(addr1, multiaddr)
if exists {
t.Fatal("object exists in store but shouldnt")
}
_, exists = mem.Get(addr2)
if exists {
t.Fatal("value found in store but should not have been")
}
v, exists := mem.Get(addr1)
if !exists {
t.Fatal("value not found in store but should have been")
}
if multiaddr.String() != v.String() {
t.Fatalf("value retrieved from store not equal to original stored address: %v, want %v", v, multiaddr)
}
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package discovery
import (
"github.com/ethersphere/bee/pkg/swarm"
ma "github.com/multiformats/go-multiaddr"
)
type Driver interface {
BroadcastPeer(addressee swarm.Address, overlay swarm.Address, addr ma.Multiaddr) error
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mock
import (
"sync"
"github.com/ethersphere/bee/pkg/swarm"
ma "github.com/multiformats/go-multiaddr"
)
type Discovery struct {
mtx sync.Mutex
ctr int //how many ops
records map[string]broadcastRecord
}
type broadcastRecord struct {
overlay swarm.Address
multiaddr ma.Multiaddr
}
func NewDiscovery() *Discovery {
return &Discovery{
records: make(map[string]broadcastRecord),
}
}
func (d *Discovery) BroadcastPeer(addressee swarm.Address, overlay swarm.Address, addr ma.Multiaddr) error {
d.mtx.Lock()
defer d.mtx.Unlock()
d.ctr++
d.records[addressee.String()] = broadcastRecord{overlay: overlay, multiaddr: addr}
return nil
}
func (d *Discovery) Broadcasts() int {
d.mtx.Lock()
defer d.mtx.Unlock()
return d.ctr
}
func (d *Discovery) AddresseeRecord(addressee swarm.Address) (overlay swarm.Address, addr ma.Multiaddr) {
d.mtx.Lock()
defer d.mtx.Unlock()
rec, exists := d.records[addressee.String()]
if !exists {
return swarm.Address{}, nil
}
return rec.overlay, rec.multiaddr
}
...@@ -9,8 +9,11 @@ import ( ...@@ -9,8 +9,11 @@ import (
"errors" "errors"
"testing" "testing"
"github.com/ethersphere/bee/pkg/addressbook/inmem"
"github.com/ethersphere/bee/pkg/discovery/mock"
"github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/libp2p" "github.com/ethersphere/bee/pkg/p2p/libp2p"
"github.com/ethersphere/bee/pkg/topology/full"
) )
func TestAddresses(t *testing.T) { func TestAddresses(t *testing.T) {
...@@ -338,3 +341,40 @@ func TestConnectWithDisabledQUICAndWSTransports(t *testing.T) { ...@@ -338,3 +341,40 @@ func TestConnectWithDisabledQUICAndWSTransports(t *testing.T) {
expectPeers(t, s2, overlay1) expectPeers(t, s2, overlay1)
expectPeersEventually(t, s1, overlay2) expectPeersEventually(t, s1, overlay2)
} }
func TestConnectWithMockDiscovery(t *testing.T) {
o1 := libp2p.Options{
NetworkID: 1,
}
s1, _, cleanup1 := newService(t, o1)
defer cleanup1()
disc2 := mock.NewDiscovery()
ab2 := inmem.New()
o2 := libp2p.Options{
NetworkID: 1,
TopologyDriver: full.New(disc2, ab2),
AddressBook: ab2,
}
s2, _, cleanup2 := newService(t, o2)
defer cleanup2()
addrs, err := s1.Addresses()
if err != nil {
t.Fatal(err)
}
addr := addrs[0]
overlay, err := s2.Connect(context.Background(), addr)
if err != nil {
t.Fatal(err)
}
if v := disc2.Broadcasts(); v != 1 {
t.Fatalf("expected 1 peer broadcasts, got %d", v)
}
if err := s2.Disconnect(overlay); err != nil {
t.Fatal(err)
}
}
...@@ -13,10 +13,12 @@ import ( ...@@ -13,10 +13,12 @@ import (
"github.com/libp2p/go-libp2p-core/helpers" "github.com/libp2p/go-libp2p-core/helpers"
"github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
handshake "github.com/ethersphere/bee/pkg/p2p/libp2p/internal/handshake" handshake "github.com/ethersphere/bee/pkg/p2p/libp2p/internal/handshake"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
"github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p"
autonat "github.com/libp2p/go-libp2p-autonat-svc" autonat "github.com/libp2p/go-libp2p-autonat-svc"
crypto "github.com/libp2p/go-libp2p-core/crypto" crypto "github.com/libp2p/go-libp2p-core/crypto"
...@@ -39,18 +41,22 @@ type Service struct { ...@@ -39,18 +41,22 @@ type Service struct {
networkID int32 networkID int32
handshakeService *handshake.Service handshakeService *handshake.Service
peers *peerRegistry peers *peerRegistry
topologyDriver topology.Driver
addressBook addressbook.Putter
logger logging.Logger logger logging.Logger
} }
type Options struct { type Options struct {
PrivateKey *ecdsa.PrivateKey PrivateKey *ecdsa.PrivateKey
Overlay swarm.Address Overlay swarm.Address
Addr string Addr string
DisableWS bool DisableWS bool
DisableQUIC bool DisableQUIC bool
Bootnodes []string Bootnodes []string
NetworkID int32 NetworkID int32
Logger logging.Logger AddressBook addressbook.GetterPutter
TopologyDriver topology.Driver
Logger logging.Logger
} }
func New(ctx context.Context, o Options) (*Service, error) { func New(ctx context.Context, o Options) (*Service, error) {
...@@ -146,6 +152,8 @@ func New(ctx context.Context, o Options) (*Service, error) { ...@@ -146,6 +152,8 @@ func New(ctx context.Context, o Options) (*Service, error) {
networkID: o.NetworkID, networkID: o.NetworkID,
handshakeService: handshake.New(peerRegistry, o.Overlay, o.NetworkID, o.Logger), handshakeService: handshake.New(peerRegistry, o.Overlay, o.NetworkID, o.Logger),
peers: peerRegistry, peers: peerRegistry,
addressBook: o.AddressBook,
topologyDriver: o.TopologyDriver,
logger: o.Logger, logger: o.Logger,
} }
...@@ -281,6 +289,13 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (overlay swarm ...@@ -281,6 +289,13 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (overlay swarm
} }
s.peers.add(stream.Conn(), i.Address) s.peers.add(stream.Conn(), i.Address)
s.addressBook.Put(i.Address, addr)
err = s.topologyDriver.AddPeer(i.Address)
if err != nil {
return swarm.Address{}, fmt.Errorf("topology addpeer: %w", err)
}
s.metrics.CreatedConnectionCount.Inc() s.metrics.CreatedConnectionCount.Inc()
s.logger.Infof("peer %s connected", i.Address) s.logger.Infof("peer %s connected", i.Address)
return i.Address, nil return i.Address, nil
......
...@@ -12,11 +12,14 @@ import ( ...@@ -12,11 +12,14 @@ import (
"testing" "testing"
"time" "time"
"github.com/ethersphere/bee/pkg/addressbook/inmem"
"github.com/ethersphere/bee/pkg/crypto" "github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/discovery/mock"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/libp2p" "github.com/ethersphere/bee/pkg/p2p/libp2p"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology/full"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
) )
...@@ -55,6 +58,14 @@ func newService(t *testing.T, o libp2p.Options) (s *libp2p.Service, overlay swar ...@@ -55,6 +58,14 @@ func newService(t *testing.T, o libp2p.Options) (s *libp2p.Service, overlay swar
if o.Addr == "" { if o.Addr == "" {
o.Addr = ":0" o.Addr = ":0"
} }
if o.AddressBook == nil {
o.AddressBook = inmem.New()
}
if o.TopologyDriver == nil {
disc := mock.NewDiscovery()
o.TopologyDriver = full.New(disc, o.AddressBook)
}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
s, err := libp2p.New(ctx, o) s, err := libp2p.New(ctx, o)
......
...@@ -28,11 +28,6 @@ type Stream interface { ...@@ -28,11 +28,6 @@ type Stream interface {
io.Closer io.Closer
} }
// PeerSuggester suggests a peer to retrieve a chunk from
type PeerSuggester interface {
SuggestPeer(addr swarm.Address) (peerAddr swarm.Address, err error)
}
type ProtocolSpec struct { type ProtocolSpec struct {
Name string Name string
Version string Version string
......
...@@ -14,6 +14,7 @@ import ( ...@@ -14,6 +14,7 @@ import (
pb "github.com/ethersphere/bee/pkg/retrieval/pb" pb "github.com/ethersphere/bee/pkg/retrieval/pb"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
) )
const ( const (
...@@ -24,16 +25,16 @@ const ( ...@@ -24,16 +25,16 @@ const (
type Service struct { type Service struct {
streamer p2p.Streamer streamer p2p.Streamer
peerSuggester p2p.PeerSuggester peerSuggester topology.ChunkPeerer
storer storage.Storer storer storage.Storer
logger logging.Logger logger logging.Logger
} }
type Options struct { type Options struct {
Streamer p2p.Streamer Streamer p2p.Streamer
PeerSuggester p2p.PeerSuggester ChunkPeerer topology.ChunkPeerer
Storer storage.Storer Storer storage.Storer
Logger logging.Logger Logger logging.Logger
} }
type Storer interface { type Storer interface {
...@@ -42,7 +43,7 @@ type Storer interface { ...@@ -42,7 +43,7 @@ type Storer interface {
func New(o Options) *Service { func New(o Options) *Service {
return &Service{ return &Service{
streamer: o.Streamer, streamer: o.Streamer,
peerSuggester: o.PeerSuggester, peerSuggester: o.ChunkPeerer,
storer: o.Storer, storer: o.Storer,
logger: o.Logger, logger: o.Logger,
} }
...@@ -62,7 +63,7 @@ func (s *Service) Protocol() p2p.ProtocolSpec { ...@@ -62,7 +63,7 @@ func (s *Service) Protocol() p2p.ProtocolSpec {
} }
func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address) (data []byte, err error) { func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address) (data []byte, err error) {
peerID, err := s.peerSuggester.SuggestPeer(addr) peerID, err := s.peerSuggester.ChunkPeer(addr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -57,10 +57,10 @@ func TestDelivery(t *testing.T) { ...@@ -57,10 +57,10 @@ func TestDelivery(t *testing.T) {
return v, err return v, err
}} }}
client := retrieval.New(retrieval.Options{ client := retrieval.New(retrieval.Options{
Streamer: recorder, Streamer: recorder,
PeerSuggester: ps, ChunkPeerer: ps,
Storer: clientMockStorer, Storer: clientMockStorer,
Logger: logger, Logger: logger,
}) })
ctx, cancel := context.WithTimeout(context.Background(), testTimeout) ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel() defer cancel()
...@@ -71,7 +71,7 @@ func TestDelivery(t *testing.T) { ...@@ -71,7 +71,7 @@ func TestDelivery(t *testing.T) {
if !bytes.Equal(v, reqData) { if !bytes.Equal(v, reqData) {
t.Fatalf("request and response data not equal. got %s want %s", v, reqData) t.Fatalf("request and response data not equal. got %s want %s", v, reqData)
} }
peerID, _ := ps.SuggestPeer(swarm.ZeroAddress) peerID, _ := ps.ChunkPeer(swarm.ZeroAddress)
records, err := recorder.Records(peerID, "retrieval", "1.0.0", "retrieval") records, err := recorder.Records(peerID, "retrieval", "1.0.0", "retrieval")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -120,6 +120,6 @@ type mockPeerSuggester struct { ...@@ -120,6 +120,6 @@ type mockPeerSuggester struct {
spFunc func(swarm.Address) (swarm.Address, error) spFunc func(swarm.Address) (swarm.Address, error)
} }
func (v mockPeerSuggester) SuggestPeer(addr swarm.Address) (swarm.Address, error) { func (v mockPeerSuggester) ChunkPeer(addr swarm.Address) (swarm.Address, error) {
return v.spFunc(addr) return v.spFunc(addr)
} }
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package full
import (
"math/rand"
"sync"
"time"
"github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/discovery"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
var _ topology.Driver = (*driver)(nil)
// driver drives the connectivity between nodes. It is a basic implementation of a connectivity driver.
// that enabled full connectivity in the sense that:
// - Every peer which is added to the driver gets broadcasted to every other peer regardless of its address.
// - A random peer is picked when asking for a peer to retrieve an arbitrary chunk (PeerSuggester interface).
type driver struct {
mtx sync.Mutex
connected map[string]swarm.Address
discovery discovery.Driver
addressBook addressbook.Getter
}
func New(disc discovery.Driver, addressBook addressbook.Getter) topology.Driver {
return &driver{
connected: make(map[string]swarm.Address),
discovery: disc,
addressBook: addressBook,
}
}
// AddPeer adds a new peer to the topology driver.
// the peer would be subsequently broadcasted to all connected peers.
func (d *driver) AddPeer(overlay swarm.Address) error {
d.mtx.Lock()
defer d.mtx.Unlock()
d.connected[overlay.String()] = overlay
ma, exists := d.addressBook.Get(overlay)
if !exists {
return topology.ErrNotFound
}
for _, addressee := range d.connected {
err := d.discovery.BroadcastPeer(addressee, overlay, ma)
if err != nil {
return err
}
}
return nil
}
// ChunkPeer is used to suggest a peer to ask a certain chunk from.
func (d *driver) ChunkPeer(addr swarm.Address) (peerAddr swarm.Address, err error) {
d.mtx.Lock()
defer d.mtx.Unlock()
if len(d.connected) == 0 {
return swarm.Address{}, topology.ErrNotFound
}
itemIdx := rand.Intn(len(d.connected))
i := 0
for _, v := range d.connected {
if i == itemIdx {
return v, nil
}
i++
}
return swarm.Address{}, topology.ErrNotFound
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package topology
import (
"errors"
"github.com/ethersphere/bee/pkg/swarm"
)
var ErrNotFound = errors.New("no peer found")
type Driver interface {
AddPeer(overlay swarm.Address) error
ChunkPeerer
}
type ChunkPeerer interface {
ChunkPeer(addr swarm.Address) (peerAddr swarm.Address, err error)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment