Commit cefe893b authored by Petar Radovic's avatar Petar Radovic Committed by GitHub

Hive alternative (#22)

 hive - initial implementation
parent 8931a3ef
......@@ -45,3 +45,21 @@ func (i *inmem) Put(overlay swarm.Address, addr ma.Multiaddr) (exists bool) {
i.entries[overlay.String()] = peerEntry{overlay: overlay, multiaddr: addr}
return e
}
func (i *inmem) Overlays() []swarm.Address {
keys := make([]swarm.Address, 0, len(i.entries))
for k := range i.entries {
keys = append(keys, swarm.MustParseHexAddress(k))
}
return keys
}
func (i *inmem) Multiaddresses() []ma.Multiaddr {
values := make([]ma.Multiaddr, 0, len(i.entries))
for _, v := range i.entries {
values = append(values, v.multiaddr)
}
return values
}
......@@ -5,11 +5,17 @@
package discovery
import (
"github.com/ethersphere/bee/pkg/swarm"
"context"
"github.com/ethersphere/bee/pkg/swarm"
ma "github.com/multiformats/go-multiaddr"
)
type BroadcastRecord struct {
Overlay swarm.Address
Addr ma.Multiaddr
}
type Driver interface {
BroadcastPeer(addressee swarm.Address, overlay swarm.Address, addr ma.Multiaddr) error
BroadcastPeers(ctx context.Context, addressee swarm.Address, peers ...BroadcastRecord) error
}
......@@ -5,34 +5,35 @@
package mock
import (
"context"
"sync"
"github.com/ethersphere/bee/pkg/discovery"
"github.com/ethersphere/bee/pkg/swarm"
ma "github.com/multiformats/go-multiaddr"
)
type Discovery struct {
mtx sync.Mutex
ctr int //how many ops
records map[string]broadcastRecord
}
type broadcastRecord struct {
overlay swarm.Address
multiaddr ma.Multiaddr
records map[string]discovery.BroadcastRecord
}
func NewDiscovery() *Discovery {
return &Discovery{
records: make(map[string]broadcastRecord),
records: make(map[string]discovery.BroadcastRecord),
}
}
func (d *Discovery) BroadcastPeer(addressee swarm.Address, overlay swarm.Address, addr ma.Multiaddr) error {
d.mtx.Lock()
defer d.mtx.Unlock()
d.ctr++
d.records[addressee.String()] = broadcastRecord{overlay: overlay, multiaddr: addr}
func (d *Discovery) BroadcastPeers(ctx context.Context, addressee swarm.Address, peers ...discovery.BroadcastRecord) error {
for _, peer := range peers {
d.mtx.Lock()
d.ctr++
d.records[addressee.String()] = discovery.BroadcastRecord{Overlay: peer.Overlay, Addr: peer.Addr}
d.mtx.Unlock()
}
return nil
}
......@@ -49,5 +50,5 @@ func (d *Discovery) AddresseeRecord(addressee swarm.Address) (overlay swarm.Addr
if !exists {
return swarm.Address{}, nil
}
return rec.overlay, rec.multiaddr
return rec.Overlay, rec.Addr
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package hive
var MaxBatchSize = maxBatchSize
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package hive
import (
"context"
"fmt"
"time"
"github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/discovery"
"github.com/ethersphere/bee/pkg/hive/pb"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/ethersphere/bee/pkg/swarm"
ma "github.com/multiformats/go-multiaddr"
)
const (
protocolName = "hive"
protocolVersion = "1.0.0"
peersStreamName = "peers"
messageTimeout = 5 * time.Second // maximum allowed time for a message to be read or written.
maxBatchSize = 50
)
type Service struct {
streamer p2p.Streamer
addressBook addressbook.GetterPutter
logger logging.Logger
}
type Options struct {
Streamer p2p.Streamer
AddressBook addressbook.GetterPutter
Logger logging.Logger
}
func New(o Options) *Service {
return &Service{
streamer: o.Streamer,
logger: o.Logger,
addressBook: o.AddressBook,
}
}
func (s *Service) Protocol() p2p.ProtocolSpec {
return p2p.ProtocolSpec{
Name: protocolName,
Version: protocolVersion,
StreamSpecs: []p2p.StreamSpec{
{
Name: peersStreamName,
Handler: s.peersHandler,
},
},
}
}
func (s *Service) BroadcastPeers(ctx context.Context, addressee swarm.Address, peers ...discovery.BroadcastRecord) error {
max := maxBatchSize
for len(peers) > 0 {
if max > len(peers) {
max = len(peers)
}
if err := s.sendPeers(ctx, addressee, peers[:max]); err != nil {
return err
}
peers = peers[max:]
}
return nil
}
func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []discovery.BroadcastRecord) error {
stream, err := s.streamer.NewStream(ctx, peer, protocolName, protocolVersion, peersStreamName)
if err != nil {
return fmt.Errorf("new stream: %w", err)
}
defer stream.Close()
w, _ := protobuf.NewWriterAndReader(stream)
var peersRequest pb.Peers
for _, p := range peers {
peersRequest.Peers = append(peersRequest.Peers, &pb.BzzAddress{
Overlay: p.Overlay.Bytes(),
Underlay: p.Addr.String(),
})
}
if err := w.WriteMsg(&peersRequest); err != nil {
return fmt.Errorf("write Peers message: %w", err)
}
return stream.FullClose()
}
func (s *Service) peersHandler(peer p2p.Peer, stream p2p.Stream) error {
defer stream.Close()
_, r := protobuf.NewWriterAndReader(stream)
var peersReq pb.Peers
if err := r.ReadMsgWithTimeout(messageTimeout, &peersReq); err != nil {
return fmt.Errorf("read requestPeers message: %w", err)
}
for _, newPeer := range peersReq.Peers {
addr, err := ma.NewMultiaddr(newPeer.Underlay)
if err != nil {
s.logger.Infof("Skipping peer in response %s: %w", newPeer, err)
continue
}
// todo: this might be changed depending on where do we decide to connect peers
s.addressBook.Put(swarm.NewAddress(newPeer.Overlay), addr)
}
return nil
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package hive_test
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"math/rand"
"reflect"
"sort"
"strconv"
"testing"
"time"
ma "github.com/multiformats/go-multiaddr"
"github.com/ethersphere/bee/pkg/addressbook/inmem"
"github.com/ethersphere/bee/pkg/discovery"
"github.com/ethersphere/bee/pkg/hive"
"github.com/ethersphere/bee/pkg/hive/pb"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/ethersphere/bee/pkg/p2p/streamtest"
"github.com/ethersphere/bee/pkg/swarm"
)
type AddressExporter interface {
Overlays() []swarm.Address
Multiaddresses() []ma.Multiaddr
}
func TestBroadcastPeers(t *testing.T) {
rand.Seed(time.Now().UnixNano())
logger := logging.New(ioutil.Discard, 0)
// populate all expected and needed random resources for 2 full batches
// tests cases that uses fewer resources can use sub-slices of this data
var multiaddrs []ma.Multiaddr
var addrs []swarm.Address
var records []discovery.BroadcastRecord
var wantMsgs []pb.Peers
for i := 0; i < 2; i++ {
wantMsgs = append(wantMsgs, pb.Peers{Peers: []*pb.BzzAddress{}})
}
for i := 0; i < 2*hive.MaxBatchSize; i++ {
ma, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/" + strconv.Itoa(i))
if err != nil {
t.Fatal(err)
}
multiaddrs = append(multiaddrs, ma)
addrs = append(addrs, swarm.NewAddress(createRandomBytes()))
wantMsgs[i/hive.MaxBatchSize].Peers = append(wantMsgs[i/hive.MaxBatchSize].Peers, &pb.BzzAddress{Overlay: addrs[i].Bytes(), Underlay: multiaddrs[i].String()})
records = append(records, discovery.BroadcastRecord{Overlay: addrs[i], Addr: multiaddrs[i]})
}
testCases := map[string]struct {
addresee swarm.Address
peers []discovery.BroadcastRecord
wantMsgs []pb.Peers
wantKeys []swarm.Address
wantValues []ma.Multiaddr
}{
"OK - single record": {
addresee: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c"),
peers: []discovery.BroadcastRecord{{Overlay: addrs[0], Addr: multiaddrs[0]}},
wantMsgs: []pb.Peers{{Peers: wantMsgs[0].Peers[:1]}},
wantKeys: []swarm.Address{addrs[0]},
wantValues: []ma.Multiaddr{multiaddrs[0]},
},
"OK - single batch - multiple records": {
addresee: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c"),
peers: records[:15],
wantMsgs: []pb.Peers{{Peers: wantMsgs[0].Peers[:15]}},
wantKeys: addrs[:15],
wantValues: multiaddrs[:15],
},
"OK - single batch - max number of records": {
addresee: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c"),
peers: records[:hive.MaxBatchSize],
wantMsgs: []pb.Peers{{Peers: wantMsgs[0].Peers[:hive.MaxBatchSize]}},
wantKeys: addrs[:hive.MaxBatchSize],
wantValues: multiaddrs[:hive.MaxBatchSize],
},
"OK - multiple batches": {
addresee: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c"),
peers: records[:hive.MaxBatchSize+10],
wantMsgs: []pb.Peers{{Peers: wantMsgs[0].Peers}, {Peers: wantMsgs[1].Peers[:10]}},
wantKeys: addrs[:hive.MaxBatchSize+10],
wantValues: multiaddrs[:hive.MaxBatchSize+10],
},
"OK - multiple batches - max number of records": {
addresee: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c"),
peers: records[:2*hive.MaxBatchSize],
wantMsgs: []pb.Peers{{Peers: wantMsgs[0].Peers}, {Peers: wantMsgs[1].Peers}},
wantKeys: addrs[:2*hive.MaxBatchSize],
wantValues: multiaddrs[:2*hive.MaxBatchSize],
},
}
for _, tc := range testCases {
addressbook := inmem.New()
exporter, ok := addressbook.(AddressExporter)
if !ok {
t.Fatal("could not type assert AddressExporter")
}
// create a hive server that handles the incoming stream
server := hive.New(hive.Options{
Logger: logger,
AddressBook: addressbook,
})
// setup the stream recorder to record stream data
recorder := streamtest.New(
streamtest.WithProtocols(server.Protocol()),
)
// create a hive client that will do broadcast
client := hive.New(hive.Options{
Streamer: recorder,
Logger: logger,
})
if err := client.BroadcastPeers(context.Background(), tc.addresee, tc.peers...); err != nil {
t.Fatal(err)
}
// get a record for this stream
records, err := recorder.Records(tc.addresee, "hive", "1.0.0", "peers")
if err != nil {
t.Fatal(err)
}
if l := len(records); l != len(tc.wantMsgs) {
t.Fatalf("got %v records, want %v", l, len(tc.wantMsgs))
}
// there is a one record per batch (wantMsg)
for i, record := range records {
messages, err := readAndAssertPeersMsgs(record.In(), 1)
if err != nil {
t.Fatal(err)
}
if fmt.Sprint(messages[0]) != fmt.Sprint(tc.wantMsgs[i]) {
t.Errorf("Messages got %v, want %v", messages, tc.wantMsgs)
}
}
if !compareOverlays(exporter.Overlays(), tc.wantKeys) {
t.Errorf("Overlays got %v, want %v", exporter.Overlays(), tc.wantKeys)
}
if !compareMultiaddrses(exporter.Multiaddresses(), tc.wantValues) {
t.Errorf("Multiaddresses got %v, want %v", exporter.Multiaddresses(), tc.wantValues)
}
}
}
func compareOverlays(keys []swarm.Address, wantKeys []swarm.Address) bool {
var stringKeys []string
for _, k := range keys {
stringKeys = append(stringKeys, k.String())
}
var stringWantKeys []string
for _, k := range wantKeys {
stringWantKeys = append(stringWantKeys, k.String())
}
sort.Strings(stringKeys)
sort.Strings(stringWantKeys)
return reflect.DeepEqual(stringKeys, stringWantKeys)
}
func compareMultiaddrses(values []ma.Multiaddr, wantValues []ma.Multiaddr) bool {
var stringVal []string
for _, v := range values {
stringVal = append(stringVal, v.String())
}
var stringWantVal []string
for _, v := range wantValues {
stringWantVal = append(stringWantVal, v.String())
}
sort.Strings(stringVal)
sort.Strings(stringWantVal)
return reflect.DeepEqual(stringVal, stringWantVal)
}
func readAndAssertPeersMsgs(in []byte, expectedLen int) ([]pb.Peers, error) {
messages, err := protobuf.ReadMessages(
bytes.NewReader(in),
func() protobuf.Message {
return new(pb.Peers)
},
)
if err != nil {
return nil, err
}
if len(messages) != expectedLen {
return nil, fmt.Errorf("got %v messages, want %v", len(messages), expectedLen)
}
var peers []pb.Peers
for _, m := range messages {
peers = append(peers, *m.(*pb.Peers))
}
return peers, nil
}
func createRandomBytes() []byte {
randBytes := make([]byte, 32)
rand.Read(randBytes)
return randBytes
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:generate sh -c "protoc -I . -I \"$(go list -f '{{ .Dir }}' -m github.com/gogo/protobuf)/protobuf\" --gogofaster_out=. hive.proto"
// Package pb holds only Protocol Buffer definitions and generated code.
package pb
This diff is collapsed.
syntax = "proto3";
package pb;
message Peers {
repeated BzzAddress peers = 1;
}
message BzzAddress {
bytes Overlay = 1;
string Underlay = 2;
}
......@@ -388,7 +388,7 @@ func TestConnectWithMockDiscovery(t *testing.T) {
t.Fatal(err)
}
if v := disc2.Broadcasts(); v != 1 {
t.Fatalf("expected 1 peer broadcasts, got %d", v)
if v := disc2.Broadcasts(); v != 2 {
t.Fatalf("expected 2 peer broadcasts, got %d", v)
}
}
......@@ -20,6 +20,7 @@ const (
ProtocolName = "handshake"
ProtocolVersion = "1.0.0"
StreamName = "handshake"
messageTimeout = 5 * time.Second // maximum allowed time for a message to be read or written.
)
// ErrNetworkIDIncompatible should be returned by handshake handlers if
......@@ -30,9 +31,6 @@ var ErrNetworkIDIncompatible = errors.New("incompatible network ID")
// the handshake response has been received by an already processed peer.
var ErrHandshakeDuplicate = errors.New("duplicate handshake")
// messageTimeout is the maximal allowed time for a message to be read or written.
var messageTimeout = 5 * time.Second
// PeerFinder has the information if the peer already exists in swarm.
type PeerFinder interface {
Exists(overlay swarm.Address) (found bool)
......
......@@ -5,6 +5,7 @@
package full
import (
"context"
"math/rand"
"sync"
"time"
......@@ -24,7 +25,7 @@ var _ topology.Driver = (*driver)(nil)
// driver drives the connectivity between nodes. It is a basic implementation of a connectivity driver.
// that enabled full connectivity in the sense that:
// - Every peer which is added to the driver gets broadcasted to every other peer regardless of its address.
// - A random peer is picked when asking for a peer to retrieve an arbitrary chunk (PeerSuggester interface).
// - A random peer is picked when asking for a peer to retrieve an arbitrary chunk (Peerer interface).
type driver struct {
mtx sync.Mutex
connected map[string]swarm.Address
......@@ -41,7 +42,8 @@ func New(disc discovery.Driver, addressBook addressbook.Getter) topology.Driver
}
// AddPeer adds a new peer to the topology driver.
// the peer would be subsequently broadcasted to all connected peers.
// The peer would be subsequently broadcasted to all connected peers.
// All conneceted peers are also broadcasted to the new peer.
func (d *driver) AddPeer(overlay swarm.Address) error {
d.mtx.Lock()
defer d.mtx.Unlock()
......@@ -51,14 +53,27 @@ func (d *driver) AddPeer(overlay swarm.Address) error {
return topology.ErrNotFound
}
var connectedNodes []discovery.BroadcastRecord
for _, addressee := range d.connected {
err := d.discovery.BroadcastPeer(addressee, overlay, ma)
cma, exists := d.addressBook.Get(addressee)
if !exists {
return topology.ErrNotFound
}
err := d.discovery.BroadcastPeers(context.Background(), addressee, discovery.BroadcastRecord{Overlay: overlay, Addr: ma})
if err != nil {
return err
}
connectedNodes = append(connectedNodes, discovery.BroadcastRecord{Overlay: addressee, Addr: cma})
}
err := d.discovery.BroadcastPeers(context.Background(), overlay, connectedNodes...)
if err != nil {
return err
}
// add peer in the end to avoid broadcast to itself
// add new node to connected nodes to avoid double broadcasts
d.connected[overlay.String()] = overlay
return nil
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment