Commit 8f780e3a authored by acud's avatar acud Committed by GitHub

[bee #47, #56]: kademlia phase 0+1 (#155)

* initial kademlia implementation
* implement `ClosestPeer` so that retrievals and push sync will work on kademlia
* hook up kademlia to be the default topology, instead of `full`
* add peer broadcasting functionality upon connecting to a peer
* add peer announcement on dial-in
parent fbf06262
......@@ -64,6 +64,9 @@ func (s *server) setupRouting() {
router.Handle("/chunks-pin", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.listPinnedChunks),
})
router.Handle("/topology", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.topologyHandler),
})
baseRouter.Handle("/", web.ChainHandlers(
logging.NewHTTPAccessLogHandler(s.Logger, logrus.InfoLevel, "debug api access"),
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package debugapi
import (
"bytes"
"encoding/json"
"io"
"net/http"
"github.com/ethersphere/bee/pkg/jsonhttp"
)
func (s *server) topologyHandler(w http.ResponseWriter, r *http.Request) {
ms, ok := s.TopologyDriver.(json.Marshaler)
if !ok {
s.Logger.Error("topology driver cast to json marshaler")
jsonhttp.InternalServerError(w, "topology json marshal interface error")
return
}
b, err := ms.MarshalJSON()
if err != nil {
s.Logger.Errorf("topology marshal to json: %v", err)
jsonhttp.InternalServerError(w, err)
return
}
_, _ = io.Copy(w, bytes.NewBuffer(b))
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package debugapi_test
import (
"encoding/json"
"errors"
"net/http"
"testing"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
topmock "github.com/ethersphere/bee/pkg/topology/mock"
)
type topologyResponse struct {
Topology string `json:"topology"`
}
func TestTopologyOK(t *testing.T) {
marshalFunc := func() ([]byte, error) {
return json.Marshal(topologyResponse{Topology: "abcd"})
}
testServer := newTestServer(t, testServerOptions{
TopologyOpts: []topmock.Option{topmock.WithMarshalJSONFunc(marshalFunc)},
})
jsonhttptest.ResponseDirect(t, testServer.Client, http.MethodGet, "/topology", nil, http.StatusOK, topologyResponse{
Topology: "abcd",
})
}
func TestTopologyError(t *testing.T) {
marshalFunc := func() ([]byte, error) {
return nil, errors.New("error")
}
testServer := newTestServer(t, testServerOptions{
TopologyOpts: []topmock.Option{topmock.WithMarshalJSONFunc(marshalFunc)},
})
jsonhttptest.ResponseDirect(t, testServer.Client, http.MethodGet, "/topology", nil, http.StatusInternalServerError, jsonhttp.StatusResponse{
Message: "error",
Code: http.StatusInternalServerError,
})
}
......@@ -29,7 +29,6 @@ func (d *Discovery) BroadcastPeers(ctx context.Context, addressee swarm.Address,
d.records[addressee.String()] = append(d.records[addressee.String()], peer)
d.mtx.Unlock()
}
d.mtx.Lock()
d.ctr++
d.mtx.Unlock()
......@@ -48,3 +47,10 @@ func (d *Discovery) AddresseeRecords(addressee swarm.Address) (peers []swarm.Add
peers, exists = d.records[addressee.String()]
return
}
func (d *Discovery) Reset() {
d.mtx.Lock()
defer d.mtx.Unlock()
d.ctr = 0
d.records = make(map[string][]swarm.Address)
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
/*
Package kademlia provides an implementation of the topology.Driver interface
in a way that a kademlia connectivity is actively maintained by the node.
A thorough explanation of the logic in the `manage()` forever loop:
The `manageC` channel gets triggered every time there's a change in the
information regarding peers we know about. This can be a result of: (1) A peer
has disconnected from us (2) A peer has been added to the list of
known peers (from discovery, debugapi, bootnode flag or just because it
was persisted in the address book and the node has been restarted).
So the information has been changed, and potentially upon disconnection,
the depth can travel to a shallower depth in result.
If a peer gets added through AddPeer, this does not necessarily infer
an immediate depth change, since the peer might end up in the backlog for
a long time until we actually need to connect to her.
The `manage()` forever-loop will connect to peers in order from shallower
to deeper depths. This is because of depth calculation method that prioritizes empty bins
That are shallower than depth. An in-depth look at `recalcDepth()` method
will clarify this (more below). So if we will connect to peers from deeper
to shallower depths, all peers in all bins will qualify as peers we'd like
to connect to (see `binSaturated` method), ending up connecting to everyone we know about.
Another important notion one must observe while inspecting how `manage()`
works, is that when we connect to peers depth can only move in one direction,
which is deeper. So this becomes our strategy and we operate with this in mind,
this is also why we iterate from shallower to deeper - since additional
connections to peers for whatever reason can only result in increasing depth.
Empty intermediate bins should be eliminated by the `binSaturated` method indicating
a bin size too short, which in turn means that connections should be established
within this bin. Empty bins have special status in terms of depth calculation
and as mentioned before they are prioritized over deeper, non empty bins and
they constitute as the node's depth when the latter is recalculated.
For the rationale behind this please refer to the appropriate chapters in the book of Swarm.
A special case of the `manage()` functionality is that when we iterate over
peers and we come across a peer that has PO >= depth, we would always like
to connect to that peer. This should always be enforced within the bounds of
the `binSaturated` function and guarantees an ever increasing kademlia depth
in an ever-increasing size of Swarm, resulting in smaller areas of responsibility
for the nodes, maintaining a general upper bound of the assigned nominal
area of responsibility in terms of actual storage requirement. See book of Swarm for more details.
Worth to note is that `manage()` will always try to initiate connections when
a bin is not saturated, however currently it will not try to eliminate connections
on bins which might be over-saturated. Ideally it should be very cheap to maintain a
connection to a peer in a bin, so we should theoretically not aspire to eliminate connections prematurely.
It is also safe to assume we will always have more than the lower bound of peers in a bin, why?
(1) Initially, we will always try to satisfy our own connectivity requirement to saturate the bin
(2) Later on, other peers will get notified about our advertised address and
will try to connect to us in order to satisfy their own connectivity thresholds
We should allow other nodes to dial in, in order to help them maintain a healthy topolgy.
It could be, however, that we would need to mark-and-sweep certain connections once a
theorical upper bound has been reached.
Depth calculation explained:
When we calculate depth we must keep in mind the following constraints:
(1) A nearest-neighborhood constitutes of an arbitrary lower bound of the
closest peers we know about, this is defined in `nnLowWatermark` and is currently set to `2`
(2) Empty bins which are shallower than depth constitute as the node's area of responsibility
As of such, we would calculate depth in the following manner:
(1) Iterate over all peers we know about, from deepest (closest) to shallowest, and count until we reach `nnLowWatermark`
(2) Once we reach `nnLowWatermark`, mark current bin as depth candidate
(3) Iterate over all bins from shallowest to deepest, and look for the shallowest empty bin
(4) If the shallowest empty bin is shallower than the depth candidate - select shallowest bin as depth, otherwise select the candidate
Note: when we are connected to less or equal to `nnLowWatermark` peers, the
depth will always be considered `0`, thus a short-circuit is handling this edge
case explicitly in the `recalcDepth` method.
TODO: add pseudo-code how to calculate depth.
A few examples to depth calculation:
1. empty kademlia
bin | nodes
-------------
==DEPTH==
0 0
1 0
2 0
3 0
4 0
depth: 0
2. less or equal to two peers (nnLowWatermark=2) (a)
bin | nodes
-------------
==DEPTH==
0 1
1 1
2 0
3 0
4 0
depth: 0
3. less or equal to two peers (nnLowWatermark=2) (b)
bin | nodes
-------------
==DEPTH==
0 1
1 0
2 1
3 0
4 0
depth: 0
4. less or equal to two peers (nnLowWatermark=2) (c)
bin | nodes
-------------
==DEPTH==
0 2
1 0
2 0
3 0
4 0
depth: 0
5. empty shallow bin
bin | nodes
-------------
0 1
==DEPTH==
1 0
2 1
3 1
4 0
depth: 1 (depth candidate is 2, but 1 is shallower and empty)
6. no empty shallower bin, depth after nnLowerWatermark found
bin | nodes
-------------
0 1
1 1
==DEPTH==
2 1
3 1
4 0
depth: 2 (depth candidate is 2, shallowest empty bin is 4)
7. last bin size >= nnLowWatermark
bin | nodes
-------------
0 1
1 1
2 1
==DEPTH==
3 3
4 0
depth: 3 (depth candidate is 3, shallowest empty bin is 4)
8. all bins full
bin | nodes
-------------
0 1
1 1
2 1
3 3
==DEPTH==
4 2
depth: 4 (depth candidate is 4, no empty bins)
*/
package kademlia
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package kademlia
var MaxBins = maxBins
var TimeToRetry = &timeToRetry
This diff is collapsed.
This diff is collapsed.
......@@ -4,12 +4,6 @@
package pslice
import "github.com/ethersphere/bee/pkg/swarm"
func PSlicePeers(p *PSlice) []swarm.Address {
return p.peers
}
func PSliceBins(p *PSlice) []uint {
return p.bins
}
......@@ -16,8 +16,8 @@ import (
// in order to reduce duplicate PO calculation which is normally known and already needed in the
// calling context.
type PSlice struct {
peers []swarm.Address
bins []uint
peers []swarm.Address // the slice of peers
bins []uint // the indexes of every proximity order in the peers slice, index is po, value is index of peers slice
sync.Mutex
}
......@@ -61,7 +61,7 @@ func (s *PSlice) EachBin(pf topology.EachPeerFunc) error {
return nil
}
// EachBinRev iterates over all peers from shallowest to deepest.
// EachBinRev iterates over all peers from shallowest bin to deepest.
func (s *PSlice) EachBinRev(pf topology.EachPeerFunc) error {
s.Lock()
defer s.Unlock()
......@@ -95,6 +95,13 @@ func (s *PSlice) EachBinRev(pf topology.EachPeerFunc) error {
return nil
}
func (s *PSlice) Length() int {
s.Lock()
defer s.Unlock()
return len(s.peers)
}
// ShallowestEmpty returns the shallowest empty bin if one exists.
// If such bin does not exists, returns true as bool value.
func (s *PSlice) ShallowestEmpty() (bin uint8, none bool) {
......@@ -143,10 +150,10 @@ func (s *PSlice) Add(addr swarm.Address, po uint8) {
if e, _ := s.exists(addr); e {
return
}
head := s.peers[:s.bins[po]]
tail := append([]swarm.Address{addr}, s.peers[s.bins[po]:]...)
s.peers = append(head, tail...)
s.incDeeper(po)
}
......@@ -164,7 +171,7 @@ func (s *PSlice) Remove(addr swarm.Address, po uint8) {
s.decDeeper(po)
}
// incDeeper increments the peers slice bin index for proximity order > po.
// incDeeper increments the peers slice bin index for proximity order > po for non-empty bins only.
// Must be called under lock.
func (s *PSlice) incDeeper(po uint8) {
if po > uint8(len(s.bins)) {
......@@ -175,9 +182,7 @@ func (s *PSlice) incDeeper(po uint8) {
// don't increment if the value in k.bins == len(k.peers)
// otherwise the calling context gets an out of bound error
// when accessing the slice
if s.bins[i] < uint(len(s.peers)) {
s.bins[i]++
}
s.bins[i]++
}
}
......
......@@ -304,8 +304,7 @@ func testIterator(t *testing.T, ps *pslice.PSlice, skipNext, stop bool, iteratio
}
func chkLen(t *testing.T, ps *pslice.PSlice, l int) {
pp := pslice.PSlicePeers(ps)
if lp := len(pp); lp != l {
if lp := ps.Length(); lp != l {
t.Fatalf("length mismatch, want %d got %d", l, lp)
}
}
......
......@@ -20,6 +20,7 @@ import (
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/debugapi"
"github.com/ethersphere/bee/pkg/hive"
"github.com/ethersphere/bee/pkg/kademlia"
"github.com/ethersphere/bee/pkg/keystore"
filekeystore "github.com/ethersphere/bee/pkg/keystore/file"
memkeystore "github.com/ethersphere/bee/pkg/keystore/mem"
......@@ -36,7 +37,6 @@ import (
mockinmem "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology/full"
"github.com/ethersphere/bee/pkg/tracing"
"github.com/ethersphere/bee/pkg/validator"
ma "github.com/multiformats/go-multiaddr"
......@@ -168,10 +168,10 @@ func NewBee(o Options) (*Bee, error) {
return nil, fmt.Errorf("hive service: %w", err)
}
topologyDriver := full.New(hive, addressbook, p2ps, logger, address)
topologyDriver := kademlia.New(kademlia.Options{Base: address, Discovery: hive, AddressBook: addressbook, P2P: p2ps, Logger: logger})
b.topologyCloser = topologyDriver
hive.SetPeerAddedHandler(topologyDriver.AddPeer)
p2ps.SetPeerAddedHandler(topologyDriver.AddPeer)
p2ps.SetNotifier(topologyDriver)
addrs, err := p2ps.Addresses()
if err != nil {
return nil, fmt.Errorf("get server addresses: %w", err)
......
......@@ -7,11 +7,15 @@ package libp2p_test
import (
"context"
"errors"
"sync"
"testing"
"time"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/libp2p"
"github.com/ethersphere/bee/pkg/p2p/libp2p/internal/handshake"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
libp2ppeer "github.com/libp2p/go-libp2p-core/peer"
)
......@@ -296,3 +300,138 @@ func TestConnectRepeatHandshake(t *testing.T) {
expectPeersEventually(t, s2)
expectPeersEventually(t, s1)
}
func TestTopologyNotifiee(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var (
mtx sync.Mutex
n1connectedAddr swarm.Address
n1disconnectedAddr swarm.Address
n2connectedAddr swarm.Address
n2disconnectedAddr swarm.Address
n1c = func(_ context.Context, a swarm.Address) error {
mtx.Lock()
defer mtx.Unlock()
expectZeroAddress(t, n1connectedAddr) // fail if set more than once
n1connectedAddr = a
return nil
}
n1d = func(a swarm.Address) {
mtx.Lock()
defer mtx.Unlock()
n1disconnectedAddr = a
}
n2c = func(_ context.Context, a swarm.Address) error {
mtx.Lock()
defer mtx.Unlock()
expectZeroAddress(t, n2connectedAddr) // fail if set more than once
n2connectedAddr = a
return nil
}
n2d = func(a swarm.Address) {
mtx.Lock()
defer mtx.Unlock()
n2disconnectedAddr = a
}
)
notifier1 := mockNotifier(n1c, n1d)
s1, overlay1 := newService(t, 1, libp2p.Options{})
s1.SetNotifier(notifier1)
notifier2 := mockNotifier(n2c, n2d)
s2, overlay2 := newService(t, 1, libp2p.Options{})
s2.SetNotifier(notifier2)
addr := serviceUnderlayAddress(t, s1)
// s2 connects to s1, thus the notifiee on s1 should be called on Connect
overlay, err := s2.Connect(ctx, addr)
if err != nil {
t.Fatal(err)
}
expectPeers(t, s2, overlay1)
expectPeersEventually(t, s1, overlay2)
// expect that n1 notifee called with s2 overlay
waitAddrSet(t, &n1connectedAddr, &mtx, overlay2)
mtx.Lock()
expectZeroAddress(t, n1disconnectedAddr, n2connectedAddr, n2disconnectedAddr)
mtx.Unlock()
// s2 disconnects from s1 so s1 disconnect notifiee should be called
if err := s2.Disconnect(overlay); err != nil {
t.Fatal(err)
}
expectPeers(t, s2)
expectPeersEventually(t, s1)
waitAddrSet(t, &n1disconnectedAddr, &mtx, overlay2)
// note that both n1disconnect and n2disconnect callbacks are called after just
// one disconnect. this is due to the fact the when the libp2p abstraction is explicitly
// called to disconnect from a peer, it will also notify the topology notifiee, since
// peer disconnections can also result from components from outside the bound of the
// topology driver
mtx.Lock()
expectZeroAddress(t, n2connectedAddr)
mtx.Unlock()
addr2 := serviceUnderlayAddress(t, s2)
// s1 connects to s2, thus the notifiee on s2 should be called on Connect
o2, err := s1.Connect(ctx, addr2)
if err != nil {
t.Fatal(err)
}
expectPeers(t, s1, overlay2)
expectPeersEventually(t, s2, overlay1)
waitAddrSet(t, &n2connectedAddr, &mtx, overlay1)
// s1 disconnects from s2 so s2 disconnect notifiee should be called
if err := s1.Disconnect(o2); err != nil {
t.Fatal(err)
}
expectPeers(t, s1)
expectPeersEventually(t, s2)
waitAddrSet(t, &n2disconnectedAddr, &mtx, overlay1)
}
func waitAddrSet(t *testing.T, addr *swarm.Address, mtx *sync.Mutex, exp swarm.Address) {
t.Helper()
for i := 0; i < 20; i++ {
mtx.Lock()
if addr.Equal(exp) {
mtx.Unlock()
return
}
mtx.Unlock()
time.Sleep(10 * time.Millisecond)
}
t.Fatal("timed out waiting for address to be set")
}
type notifiee struct {
connected func(context.Context, swarm.Address) error
disconnected func(swarm.Address)
}
func (n *notifiee) Connected(c context.Context, a swarm.Address) error {
return n.connected(c, a)
}
func (n *notifiee) Disconnected(a swarm.Address) {
n.disconnected(a)
}
func mockNotifier(c cFunc, d dFunc) topology.Notifier {
return &notifiee{connected: c, disconnected: d}
}
type cFunc func(context.Context, swarm.Address) error
type dFunc func(swarm.Address)
......@@ -18,6 +18,7 @@ import (
"github.com/ethersphere/bee/pkg/p2p/libp2p/internal/breaker"
handshake "github.com/ethersphere/bee/pkg/p2p/libp2p/internal/handshake"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/tracing"
"github.com/libp2p/go-libp2p"
autonat "github.com/libp2p/go-libp2p-autonat-svc"
......@@ -49,7 +50,7 @@ type Service struct {
handshakeService *handshake.Service
addressbook addressbook.Putter
peers *peerRegistry
peerHandler func(context.Context, swarm.Address) error
topologyNotifier topology.Notifier
conectionBreaker breaker.Interface
logger logging.Logger
tracer *tracing.Tracer
......@@ -214,8 +215,8 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
return
}
if s.peerHandler != nil {
if err := s.peerHandler(ctx, i.Overlay); err != nil {
if s.topologyNotifier != nil {
if err := s.topologyNotifier.Connected(ctx, i.Overlay); err != nil {
s.logger.Debugf("peerhandler error: %s: %v", peerID, err)
}
}
......@@ -361,7 +362,6 @@ func (s *Service) disconnect(peerID libp2ppeer.ID) error {
if err := s.host.Network().ClosePeer(peerID); err != nil {
return err
}
s.peers.remove(peerID)
return nil
}
......@@ -370,8 +370,9 @@ func (s *Service) Peers() []p2p.Peer {
return s.peers.peers()
}
func (s *Service) SetPeerAddedHandler(h func(context.Context, swarm.Address) error) {
s.peerHandler = h
func (s *Service) SetNotifier(n topology.Notifier) {
s.topologyNotifier = n
s.peers.setDisconnecter(n)
}
func (s *Service) NewStream(ctx context.Context, overlay swarm.Address, headers p2p.Headers, protocolName, protocolVersion, streamName string) (p2p.Stream, error) {
......
......@@ -82,7 +82,7 @@ func expectPeers(t *testing.T, s *libp2p.Service, addrs ...swarm.Address) {
}
// expectPeersEventually validates that peers with addresses are connected with
// retires. It is supposed to be used to validate asynchronous connecting on the
// retries. It is supposed to be used to validate asynchronous connecting on the
// peer that is connected to.
func expectPeersEventually(t *testing.T, s *libp2p.Service, addrs ...swarm.Address) {
t.Helper()
......@@ -115,6 +115,15 @@ func expectPeersEventually(t *testing.T, s *libp2p.Service, addrs ...swarm.Addre
}
}
func expectZeroAddress(t *testing.T, addrs ...swarm.Address) {
t.Helper()
for i, a := range addrs {
if !a.Equal(swarm.ZeroAddress) {
t.Fatalf("address did not equal zero address. index %d", i)
}
}
}
func serviceUnderlayAddress(t *testing.T, s *libp2p.Service) multiaddr.Multiaddr {
t.Helper()
......
......@@ -11,6 +11,7 @@ import (
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
"github.com/libp2p/go-libp2p-core/network"
libp2ppeer "github.com/libp2p/go-libp2p-core/peer"
)
......@@ -21,7 +22,8 @@ type peerRegistry struct {
connections map[libp2ppeer.ID]map[network.Conn]struct{} // list of connections for safe removal on Disconnect notification
mu sync.RWMutex
network.Notifiee // peerRegistry can be the receiver for network.Notify
disconnecter topology.Disconnecter // peerRegistry notifies topology on peer disconnection
network.Notifiee // peerRegistry can be the receiver for network.Notify
}
func newPeerRegistry() *peerRegistry {
......@@ -29,7 +31,8 @@ func newPeerRegistry() *peerRegistry {
underlays: make(map[string]libp2ppeer.ID),
overlays: make(map[libp2ppeer.ID]swarm.Address),
connections: make(map[libp2ppeer.ID]map[network.Conn]struct{}),
Notifiee: new(network.NoopNotifiee),
Notifiee: new(network.NoopNotifiee),
}
}
......@@ -60,6 +63,9 @@ func (r *peerRegistry) Disconnected(_ network.Network, c network.Conn) {
if len(r.connections[peerID]) == 0 {
delete(r.connections, peerID)
}
if r.disconnecter != nil {
r.disconnecter.Disconnected(overlay)
}
}
func (r *peerRegistry) peers() []p2p.Peer {
......@@ -117,4 +123,12 @@ func (r *peerRegistry) remove(peerID libp2ppeer.ID) {
delete(r.underlays, overlay.ByteString())
delete(r.connections, peerID)
r.mu.Unlock()
if r.disconnecter != nil {
r.disconnecter.Disconnected(overlay)
}
}
func (r *peerRegistry) setDisconnecter(d topology.Disconnecter) {
r.disconnecter = d
}
......@@ -10,16 +10,17 @@ import (
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
ma "github.com/multiformats/go-multiaddr"
)
type Service struct {
addProtocolFunc func(p2p.ProtocolSpec) error
connectFunc func(ctx context.Context, addr ma.Multiaddr) (overlay swarm.Address, err error)
disconnectFunc func(overlay swarm.Address) error
peersFunc func() []p2p.Peer
setPeerAddedHandlerFunc func(func(context.Context, swarm.Address) error)
addressesFunc func() ([]ma.Multiaddr, error)
addProtocolFunc func(p2p.ProtocolSpec) error
connectFunc func(ctx context.Context, addr ma.Multiaddr) (overlay swarm.Address, err error)
disconnectFunc func(overlay swarm.Address) error
peersFunc func() []p2p.Peer
setNotifierFunc func(topology.Notifier)
addressesFunc func() ([]ma.Multiaddr, error)
}
func WithAddProtocolFunc(f func(p2p.ProtocolSpec) error) Option {
......@@ -46,9 +47,9 @@ func WithPeersFunc(f func() []p2p.Peer) Option {
})
}
func WithSetPeerAddedHandlerFunc(f func(func(context.Context, swarm.Address) error)) Option {
func WithSetNotifierFunc(f func(topology.Notifier)) Option {
return optionFunc(func(s *Service) {
s.setPeerAddedHandlerFunc = f
s.setNotifierFunc = f
})
}
......@@ -87,12 +88,12 @@ func (s *Service) Disconnect(overlay swarm.Address) error {
return s.disconnectFunc(overlay)
}
func (s *Service) SetPeerAddedHandler(f func(context.Context, swarm.Address) error) {
if s.setPeerAddedHandlerFunc == nil {
func (s *Service) SetNotifier(f topology.Notifier) {
if s.setNotifierFunc == nil {
return
}
s.setPeerAddedHandlerFunc(f)
s.setNotifierFunc(f)
}
func (s *Service) Addresses() ([]ma.Multiaddr, error) {
......
......@@ -9,6 +9,7 @@ import (
"io"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
ma "github.com/multiformats/go-multiaddr"
)
......@@ -18,7 +19,7 @@ type Service interface {
Connect(ctx context.Context, addr ma.Multiaddr) (overlay swarm.Address, err error)
Disconnect(overlay swarm.Address) error
Peers() []Peer
SetPeerAddedHandler(func(context.Context, swarm.Address) error)
SetNotifier(topology.Notifier)
Addresses() ([]ma.Multiaddr, error)
}
......
......@@ -6,7 +6,9 @@ package full
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
"sync"
"time"
......@@ -155,6 +157,28 @@ func (d *driver) ClosestPeer(addr swarm.Address) (swarm.Address, error) {
return closest, nil
}
func (d *driver) Connected(ctx context.Context, addr swarm.Address) error {
return d.AddPeer(ctx, addr)
}
func (d *driver) Disconnected(swarm.Address) {
// TODO: implement if necessary
}
func (d *driver) MarshalJSON() ([]byte, error) {
var peers []string
for p := range d.receivedPeers {
peers = append(peers, p)
}
return json.Marshal(struct {
Peers []string `json:"peers"`
}{Peers: peers})
}
func (d *driver) String() string {
return fmt.Sprintf("%s", d.receivedPeers)
}
func (d *driver) Close() error {
close(d.quit)
return nil
......
......@@ -13,11 +13,12 @@ import (
)
type mock struct {
peers []swarm.Address
closestPeer swarm.Address
closestPeerErr error
addPeerErr error
mtx sync.Mutex
peers []swarm.Address
closestPeer swarm.Address
closestPeerErr error
addPeerErr error
marshalJSONFunc func() ([]byte, error)
mtx sync.Mutex
}
func WithAddPeerErr(err error) Option {
......@@ -38,6 +39,12 @@ func WithClosestPeerErr(err error) Option {
})
}
func WithMarshalJSONFunc(f func() ([]byte, error)) Option {
return optionFunc(func(d *mock) {
d.marshalJSONFunc = f
})
}
func NewTopologyDriver(opts ...Option) topology.Driver {
d := new(mock)
for _, o := range opts {
......@@ -56,6 +63,13 @@ func (d *mock) AddPeer(_ context.Context, addr swarm.Address) error {
d.mtx.Unlock()
return nil
}
func (d *mock) Connected(ctx context.Context, addr swarm.Address) error {
return d.AddPeer(ctx, addr)
}
func (d *mock) Disconnected(swarm.Address) {
panic("todo")
}
func (d *mock) Peers() []swarm.Address {
return d.peers
......@@ -65,6 +79,10 @@ func (d *mock) ClosestPeer(addr swarm.Address) (peerAddr swarm.Address, err erro
return d.closestPeer, d.closestPeerErr
}
func (d *mock) MarshalJSON() ([]byte, error) {
return d.marshalJSONFunc()
}
func (d *mock) Close() error {
return nil
}
......
......@@ -12,19 +12,39 @@ import (
"github.com/ethersphere/bee/pkg/swarm"
)
var ErrNotFound = errors.New("no peer found")
var ErrWantSelf = errors.New("node wants self")
var (
ErrNotFound = errors.New("no peer found")
ErrWantSelf = errors.New("node wants self")
)
type Driver interface {
PeerAdder
ClosestPeerer
Notifier
io.Closer
}
type Notifier interface {
Connecter
Disconnecter
}
type PeerAdder interface {
// AddPeer is called when a peer is added to the topology backlog
// for further processing by connectivity strategy.
AddPeer(ctx context.Context, addr swarm.Address) error
}
type Connecter interface {
// Connected is called when a peer dials in.
Connected(context.Context, swarm.Address) error
}
type Disconnecter interface {
// Disconnected is called when a peer disconnects.
Disconnected(swarm.Address)
}
type ClosestPeerer interface {
ClosestPeer(addr swarm.Address) (peerAddr swarm.Address, err error)
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment