Commit 8a73821b authored by Nemanja Zbiljić's avatar Nemanja Zbiljić Committed by GitHub

kademlia: balanced bins in kademlia (#1207)

Co-authored-by: default avatarMetacertain <metacertain@gmail.com>
parent 0e4333fb
...@@ -9,6 +9,8 @@ import ( ...@@ -9,6 +9,8 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"math"
"math/bits"
"sync" "sync"
"time" "time"
...@@ -26,6 +28,7 @@ const ( ...@@ -26,6 +28,7 @@ const (
nnLowWatermark = 2 // the number of peers in consecutive deepest bins that constitute as nearest neighbours nnLowWatermark = 2 // the number of peers in consecutive deepest bins that constitute as nearest neighbours
maxConnAttempts = 3 // when there is maxConnAttempts failed connect calls for a given peer it is considered non-connectable maxConnAttempts = 3 // when there is maxConnAttempts failed connect calls for a given peer it is considered non-connectable
maxBootnodeAttempts = 3 // how many attempts to dial to bootnodes before giving up maxBootnodeAttempts = 3 // how many attempts to dial to bootnodes before giving up
defaultBitSuffixLength = 2 // the number of bits used to create pseudo addresses for balancing
) )
var ( var (
...@@ -45,6 +48,7 @@ type Options struct { ...@@ -45,6 +48,7 @@ type Options struct {
Bootnodes []ma.Multiaddr Bootnodes []ma.Multiaddr
StandaloneMode bool StandaloneMode bool
BootnodeMode bool BootnodeMode bool
BitSuffixLength int
} }
// Kad is the Swarm forwarding kademlia implementation. // Kad is the Swarm forwarding kademlia implementation.
...@@ -54,6 +58,8 @@ type Kad struct { ...@@ -54,6 +58,8 @@ type Kad struct {
addressBook addressbook.Interface // address book to get underlays addressBook addressbook.Interface // address book to get underlays
p2p p2p.Service // p2p service to connect to nodes with p2p p2p.Service // p2p service to connect to nodes with
saturationFunc binSaturationFunc // pluggable saturation function saturationFunc binSaturationFunc // pluggable saturation function
bitSuffixLength int // additional depth of common prefix for bin
commonBinPrefixes [][]swarm.Address // list of address prefixes for each bin
connectedPeers *pslice.PSlice // a slice of peers sorted and indexed by po, indexes kept in `bins` connectedPeers *pslice.PSlice // a slice of peers sorted and indexed by po, indexes kept in `bins`
knownPeers *pslice.PSlice // both are po aware slice of addresses knownPeers *pslice.PSlice // both are po aware slice of addresses
bootnodes []ma.Multiaddr bootnodes []ma.Multiaddr
...@@ -82,6 +88,9 @@ func New(base swarm.Address, addressbook addressbook.Interface, discovery discov ...@@ -82,6 +88,9 @@ func New(base swarm.Address, addressbook addressbook.Interface, discovery discov
if o.SaturationFunc == nil { if o.SaturationFunc == nil {
o.SaturationFunc = binSaturated o.SaturationFunc = binSaturated
} }
if o.BitSuffixLength == 0 {
o.BitSuffixLength = defaultBitSuffixLength
}
k := &Kad{ k := &Kad{
base: base, base: base,
...@@ -89,6 +98,8 @@ func New(base swarm.Address, addressbook addressbook.Interface, discovery discov ...@@ -89,6 +98,8 @@ func New(base swarm.Address, addressbook addressbook.Interface, discovery discov
addressBook: addressbook, addressBook: addressbook,
p2p: p2p, p2p: p2p,
saturationFunc: o.SaturationFunc, saturationFunc: o.SaturationFunc,
bitSuffixLength: o.BitSuffixLength,
commonBinPrefixes: make([][]swarm.Address, int(swarm.MaxBins)),
connectedPeers: pslice.New(int(swarm.MaxBins)), connectedPeers: pslice.New(int(swarm.MaxBins)),
knownPeers: pslice.New(int(swarm.MaxBins)), knownPeers: pslice.New(int(swarm.MaxBins)),
bootnodes: o.Bootnodes, bootnodes: o.Bootnodes,
...@@ -102,9 +113,94 @@ func New(base swarm.Address, addressbook addressbook.Interface, discovery discov ...@@ -102,9 +113,94 @@ func New(base swarm.Address, addressbook addressbook.Interface, discovery discov
wg: sync.WaitGroup{}, wg: sync.WaitGroup{},
} }
if k.bitSuffixLength > 0 {
k.generateCommonBinPrefixes()
}
return k return k
} }
func (k *Kad) generateCommonBinPrefixes() {
bitCombinationsCount := int(math.Pow(2, float64(k.bitSuffixLength)))
bitSufixes := make([]uint8, bitCombinationsCount)
for i := 0; i < bitCombinationsCount; i++ {
bitSufixes[i] = uint8(i)
}
addr := swarm.MustParseHexAddress(k.base.String())
addrBytes := addr.Bytes()
_ = addrBytes
binPrefixes := k.commonBinPrefixes
// copy base address
for i := range binPrefixes {
binPrefixes[i] = make([]swarm.Address, bitCombinationsCount)
}
for i := range binPrefixes {
for j := range binPrefixes[i] {
pseudoAddrBytes := make([]byte, len(k.base.Bytes()))
copy(pseudoAddrBytes, k.base.Bytes())
binPrefixes[i][j] = swarm.NewAddress(pseudoAddrBytes)
}
}
for i := range binPrefixes {
for j := range binPrefixes[i] {
pseudoAddrBytes := binPrefixes[i][j].Bytes()
// flip first bit for bin
indexByte, posBit := i/8, i%8
if hasBit(bits.Reverse8(pseudoAddrBytes[indexByte]), uint8(posBit)) {
pseudoAddrBytes[indexByte] = bits.Reverse8(clearBit(bits.Reverse8(pseudoAddrBytes[indexByte]), uint8(posBit)))
} else {
pseudoAddrBytes[indexByte] = bits.Reverse8(setBit(bits.Reverse8(pseudoAddrBytes[indexByte]), uint8(posBit)))
}
// set pseudo suffix
bitSuffixPos := k.bitSuffixLength - 1
for l := i + 1; l < i+k.bitSuffixLength+1; l++ {
index, pos := l/8, l%8
if hasBit(bitSufixes[j], uint8(bitSuffixPos)) {
pseudoAddrBytes[index] = bits.Reverse8(setBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos)))
} else {
pseudoAddrBytes[index] = bits.Reverse8(clearBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos)))
}
bitSuffixPos--
}
// clear rest of the bits
for l := i + k.bitSuffixLength + 1; l < len(pseudoAddrBytes)*8; l++ {
index, pos := l/8, l%8
pseudoAddrBytes[index] = bits.Reverse8(clearBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos)))
}
}
}
}
// Clears the bit at pos in n.
func clearBit(n, pos uint8) uint8 {
mask := ^(uint8(1) << pos)
n &= mask
return n
}
// Sets the bit at pos in the integer n.
func setBit(n, pos uint8) uint8 {
n |= (1 << pos)
return n
}
func hasBit(n, pos uint8) bool {
val := n & (1 << pos)
return (val > 0)
}
// manage is a forever loop that manages the connection to new peers // manage is a forever loop that manages the connection to new peers
// once they get added or once others leave. // once they get added or once others leave.
func (k *Kad) manage() { func (k *Kad) manage() {
...@@ -141,7 +237,112 @@ func (k *Kad) manage() { ...@@ -141,7 +237,112 @@ func (k *Kad) manage() {
if k.standalone { if k.standalone {
continue continue
} }
err := k.knownPeers.EachBinRev(func(peer swarm.Address, po uint8) (bool, bool, error) {
// attempt balanced connection first
err := func() error {
// for each bin
for i := range k.commonBinPrefixes {
// and each pseudo address
for j := range k.commonBinPrefixes[i] {
pseudoAddr := k.commonBinPrefixes[i][j]
closestConnectedPeer, err := closestPeer(k.connectedPeers, pseudoAddr, swarm.ZeroAddress)
if err != nil {
if errors.Is(err, topology.ErrNotFound) {
break
}
k.logger.Errorf("closest connected peer: %v", err)
continue
}
// check proximity
closestConnectedPO := swarm.ExtendedProximity(closestConnectedPeer.Bytes(), pseudoAddr.Bytes())
if int(closestConnectedPO) < i+k.bitSuffixLength+1 {
// connect to closest known peer
closestKnownPeer, err := closestPeer(k.knownPeers, pseudoAddr, swarm.ZeroAddress)
if err != nil {
if errors.Is(err, topology.ErrNotFound) {
break
}
k.logger.Errorf("closest known peer: %v", err)
continue
}
if k.connectedPeers.Exists(closestKnownPeer) {
continue
}
closestKnownPeerPO := swarm.ExtendedProximity(closestKnownPeer.Bytes(), pseudoAddr.Bytes())
if int(closestKnownPeerPO) < i+k.bitSuffixLength+1 {
continue
}
peer := closestKnownPeer
bzzAddr, err := k.addressBook.Get(peer)
if err != nil {
if err == addressbook.ErrNotFound {
k.logger.Debugf("failed to get address book entry for peer: %s", peer.String())
peerToRemove = peer
return errMissingAddressBookEntry
}
// either a peer is not known in the address book, in which case it
// should be removed, or that some severe I/O problem is at hand
return err
}
po := swarm.Proximity(k.base.Bytes(), peer.Bytes())
err = k.connect(ctx, peer, bzzAddr.Underlay, po)
if err != nil {
if errors.Is(err, errOverlayMismatch) {
k.knownPeers.Remove(peer, po)
if err := k.addressBook.Remove(peer); err != nil {
k.logger.Debugf("could not remove peer from addressbook: %s", peer.String())
}
}
k.logger.Debugf("peer not reachable from kademlia %s: %v", bzzAddr.String(), err)
k.logger.Warningf("peer not reachable when attempting to connect")
// continue to next
return nil
}
k.waitNextMu.Lock()
k.waitNext[peer.String()] = retryInfo{tryAfter: time.Now().Add(shortRetry)}
k.waitNextMu.Unlock()
k.connectedPeers.Add(peer, po)
k.depthMu.Lock()
k.depth = recalcDepth(k.connectedPeers)
k.depthMu.Unlock()
k.logger.Debugf("connected to peer: %s for bin: %d", peer, i)
k.notifyPeerSig()
}
}
}
return nil
}()
k.logger.Tracef("kademlia balanced connector took %s to finish", time.Since(start))
if err != nil {
if errors.Is(err, errMissingAddressBookEntry) {
po := swarm.Proximity(k.base.Bytes(), peerToRemove.Bytes())
k.knownPeers.Remove(peerToRemove, po)
} else {
k.logger.Errorf("kademlia manage loop iterator: %v", err)
}
}
err = k.knownPeers.EachBinRev(func(peer swarm.Address, po uint8) (bool, bool, error) {
if k.connectedPeers.Exists(peer) { if k.connectedPeers.Exists(peer) {
return false, false, nil return false, false, nil
...@@ -550,6 +751,46 @@ func (k *Kad) notifyPeerSig() { ...@@ -550,6 +751,46 @@ func (k *Kad) notifyPeerSig() {
} }
} }
func closestPeer(peers *pslice.PSlice, addr swarm.Address, skipPeers ...swarm.Address) (swarm.Address, error) {
closest := swarm.Address{}
err := peers.EachBinRev(func(peer swarm.Address, po uint8) (bool, bool, error) {
for _, a := range skipPeers {
if a.Equal(peer) {
return false, false, nil
}
}
if closest.IsZero() {
closest = peer
return false, false, nil
}
dcmp, err := swarm.DistanceCmp(addr.Bytes(), closest.Bytes(), peer.Bytes())
if err != nil {
return false, false, err
}
switch dcmp {
case 0:
// do nothing
case -1:
// current peer is closer
closest = peer
case 1:
// closest is already closer to chunk
// do nothing
}
return false, false, nil
})
if err != nil {
return swarm.Address{}, err
}
// check if found
if closest.IsZero() {
return swarm.Address{}, topology.ErrNotFound
}
return closest, nil
}
// ClosestPeer returns the closest peer to a given address. // ClosestPeer returns the closest peer to a given address.
func (k *Kad) ClosestPeer(addr swarm.Address, skipPeers ...swarm.Address) (swarm.Address, error) { func (k *Kad) ClosestPeer(addr swarm.Address, skipPeers ...swarm.Address) (swarm.Address, error) {
if k.connectedPeers.Length() == 0 { if k.connectedPeers.Length() == 0 {
...@@ -642,6 +883,32 @@ func (k *Kad) neighborhoodDepth() uint8 { ...@@ -642,6 +883,32 @@ func (k *Kad) neighborhoodDepth() uint8 {
return k.depth return k.depth
} }
// IsBalanced returns if Kademlia is balanced to bin.
func (k *Kad) IsBalanced(bin uint8) bool {
k.depthMu.RLock()
defer k.depthMu.RUnlock()
if int(bin) > len(k.commonBinPrefixes) {
return false
}
// for each pseudo address
for i := range k.commonBinPrefixes[bin] {
pseudoAddr := k.commonBinPrefixes[bin][i]
closestConnectedPeer, err := closestPeer(k.connectedPeers, pseudoAddr, swarm.ZeroAddress)
if err != nil {
return false
}
closestConnectedPO := swarm.ExtendedProximity(closestConnectedPeer.Bytes(), pseudoAddr.Bytes())
if int(closestConnectedPO) < int(bin)+k.bitSuffixLength+1 {
return false
}
}
return true
}
// MarshalJSON returns a JSON representation of Kademlia. // MarshalJSON returns a JSON representation of Kademlia.
func (k *Kad) MarshalJSON() ([]byte, error) { func (k *Kad) MarshalJSON() ([]byte, error) {
return k.marshal(false) return k.marshal(false)
......
...@@ -167,7 +167,7 @@ func TestManage(t *testing.T) { ...@@ -167,7 +167,7 @@ func TestManage(t *testing.T) {
saturationFunc = func(bin uint8, peers, connected *pslice.PSlice) (bool, bool) { saturationFunc = func(bin uint8, peers, connected *pslice.PSlice) (bool, bool) {
return saturationVal, overSaturationVal return saturationVal, overSaturationVal
} }
base, kad, ab, _, signer = newTestKademlia(&conns, nil, kademlia.Options{SaturationFunc: saturationFunc}) base, kad, ab, _, signer = newTestKademlia(&conns, nil, kademlia.Options{BitSuffixLength: -1, SaturationFunc: saturationFunc})
) )
if err := kad.Start(context.Background()); err != nil { if err := kad.Start(context.Background()); err != nil {
...@@ -202,6 +202,65 @@ func TestManage(t *testing.T) { ...@@ -202,6 +202,65 @@ func TestManage(t *testing.T) {
waitCounter(t, &conns, 0) waitCounter(t, &conns, 0)
} }
func TestManageWithBalancing(t *testing.T) {
// use "fixed" seed for this
rand.Seed(2)
var (
conns int32 // how many connect calls were made to the p2p mock
saturationFuncImpl *func(bin uint8, peers, connected *pslice.PSlice) (bool, bool)
saturationFunc = func(bin uint8, peers, connected *pslice.PSlice) (bool, bool) {
f := *saturationFuncImpl
return f(bin, peers, connected)
}
base, kad, ab, _, signer = newTestKademlia(&conns, nil, kademlia.Options{SaturationFunc: saturationFunc, BitSuffixLength: 2})
)
// implement satiration function (while having access to Kademlia instance)
sfImpl := func(bin uint8, peers, connected *pslice.PSlice) (bool, bool) {
return kad.IsBalanced(bin), false
}
saturationFuncImpl = &sfImpl
if err := kad.Start(context.Background()); err != nil {
t.Fatal(err)
}
defer kad.Close()
// add peers for bin '0', enough to have balanced connections
for i := 0; i < 20; i++ {
addr := test.RandomAddressAt(base, 0)
addOne(t, signer, kad, ab, addr)
}
waitBalanced(t, kad, 0)
// add peers for other bins, enough to have balanced connections
for i := 1; i <= int(swarm.MaxPO); i++ {
for j := 0; j < 20; j++ {
addr := test.RandomAddressAt(base, i)
addOne(t, signer, kad, ab, addr)
}
// sanity check depth
kDepth(t, kad, i)
}
// Without introducing ExtendedPO / ExtendedProximity, we could only have balanced connections until a depth of 12
// That is because, the proximity expected for a balanced connection is Bin + 1 + suffix length
// But, Proximity(one, other) is limited to return MaxPO.
// So, when we get to 1 + suffix length near MaxPO, our expected proximity is not returned,
// even if the addresses match in the expected number of bits, because of the MaxPO limiting
// Without extendedPO, suffix length is 2, + 1 = 3, MaxPO is 15,
// so we could only have balanced connections for up until bin 12, but not bin 13,
// as we would be expecting proximity of pseudoaddress-balancedConnection as 16 and get 15 only
for i := 1; i <= int(swarm.MaxPO); i++ {
waitBalanced(t, kad, uint8(i))
}
}
// TestBinSaturation tests the builtin binSaturated function. // TestBinSaturation tests the builtin binSaturated function.
// the test must have two phases of adding peers so that the section // the test must have two phases of adding peers so that the section
// beyond the first flow control statement gets hit (if po >= depth), // beyond the first flow control statement gets hit (if po >= depth),
...@@ -216,7 +275,7 @@ func TestBinSaturation(t *testing.T) { ...@@ -216,7 +275,7 @@ func TestBinSaturation(t *testing.T) {
var ( var (
conns int32 // how many connect calls were made to the p2p mock conns int32 // how many connect calls were made to the p2p mock
base, kad, ab, _, signer = newTestKademlia(&conns, nil, kademlia.Options{}) base, kad, ab, _, signer = newTestKademlia(&conns, nil, kademlia.Options{BitSuffixLength: -1})
peers []swarm.Address peers []swarm.Address
) )
...@@ -1033,3 +1092,23 @@ func isIn(addr swarm.Address, addrs []swarm.Address) bool { ...@@ -1033,3 +1092,23 @@ func isIn(addr swarm.Address, addrs []swarm.Address) bool {
} }
return false return false
} }
// waitBalanced waits for kademlia to be balanced for specified bin.
func waitBalanced(t *testing.T, k *kademlia.Kad, bin uint8) {
t.Helper()
timeout := time.After(3 * time.Second)
for {
select {
case <-timeout:
t.Fatalf("timed out waiting to be balanced for bin: %d", int(bin))
default:
}
if balanced := k.IsBalanced(bin); balanced {
return
}
time.Sleep(50 * time.Millisecond)
}
}
...@@ -36,3 +36,23 @@ func Proximity(one, other []byte) (ret uint8) { ...@@ -36,3 +36,23 @@ func Proximity(one, other []byte) (ret uint8) {
} }
return MaxPO return MaxPO
} }
func ExtendedProximity(one, other []byte) (ret uint8) {
b := ExtendedPO/8 + 1
if l := uint8(len(one)); b > l {
b = l
}
if l := uint8(len(other)); b > l {
b = l
}
var m uint8 = 8
for i := uint8(0); i < b; i++ {
oxo := one[i] ^ other[i]
for j := uint8(0); j < m; j++ {
if (oxo>>(7-j))&0x01 != 0 {
return i*8 + j
}
}
}
return ExtendedPO
}
...@@ -24,6 +24,7 @@ const ( ...@@ -24,6 +24,7 @@ const (
ChunkSize = SectionSize * Branches ChunkSize = SectionSize * Branches
HashSize = 32 HashSize = 32
MaxPO uint8 = 15 MaxPO uint8 = 15
ExtendedPO uint8 = MaxPO + 5
MaxBins = MaxPO + 1 MaxBins = MaxPO + 1
ChunkWithSpanSize = ChunkSize + SpanSize ChunkWithSpanSize = ChunkSize + SpanSize
) )
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment