Commit 5bf20562 authored by acud's avatar acud Committed by GitHub

kademlia, topology: introduce pslice (#169)

* kademlia, topology: introduce pslice
parent 39849d2b
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pslice
import "github.com/ethersphere/bee/pkg/swarm"
func PSlicePeers(p *PSlice) []swarm.Address {
return p.peers
}
func PSliceBins(p *PSlice) []uint {
return p.bins
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pslice
import (
"sync"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
)
// PSlice maintains a list of addresses, indexing them by their different proximity orders.
// Currently, when peers are added or removed, their proximity order must be supplied, this is
// in order to reduce duplicate PO calculation which is normally known and already needed in the
// calling context.
type PSlice struct {
peers []swarm.Address
bins []uint
sync.Mutex
}
// New creates a new PSlice.
func New(maxBins int) *PSlice {
return &PSlice{
peers: make([]swarm.Address, 0),
bins: make([]uint, maxBins),
}
}
// iterates over all peers from deepest bin to shallowest.
func (s *PSlice) EachBin(pf topology.EachPeerFunc) error {
s.Lock()
defer s.Unlock()
if len(s.peers) == 0 {
return nil
}
var binEnd = uint(len(s.peers))
for i := len(s.bins) - 1; i >= 0; i-- {
peers := s.peers[s.bins[i]:binEnd]
for _, v := range peers {
stop, next, err := pf(v, uint8(i))
if err != nil {
return err
}
if stop {
return nil
}
if next {
break
}
}
binEnd = s.bins[i]
}
return nil
}
// EachBinRev iterates over all peers from shallowest to deepest.
func (s *PSlice) EachBinRev(pf topology.EachPeerFunc) error {
s.Lock()
defer s.Unlock()
if len(s.peers) == 0 {
return nil
}
var binEnd int
for i := 0; i <= len(s.bins)-1; i++ {
if i == len(s.bins)-1 {
binEnd = len(s.peers)
} else {
binEnd = int(s.bins[i+1])
}
peers := s.peers[s.bins[i]:binEnd]
for _, v := range peers {
stop, next, err := pf(v, uint8(i))
if err != nil {
return err
}
if stop {
return nil
}
if next {
break
}
}
}
return nil
}
// ShallowestEmpty returns the shallowest empty bin if one exists.
// If such bin does not exists, returns true as bool value.
func (s *PSlice) ShallowestEmpty() (bin uint8, none bool) {
s.Lock()
defer s.Unlock()
binCp := make([]uint, len(s.bins)+1)
copy(binCp, s.bins)
binCp[len(binCp)-1] = uint(len(s.peers))
for i := uint8(0); i < uint8(len(binCp)-1); i++ {
if binCp[i+1] == binCp[i] {
return i, false
}
}
return 0, true
}
// Exists checks if a peer exists.
func (s *PSlice) Exists(addr swarm.Address) bool {
s.Lock()
defer s.Unlock()
b, _ := s.exists(addr)
return b
}
// checks if a peer exists. must be called under lock.
func (s *PSlice) exists(addr swarm.Address) (bool, int) {
if len(s.peers) == 0 {
return false, 0
}
for i, a := range s.peers {
if a.Equal(addr) {
return true, i
}
}
return false, 0
}
// Add a peer at a certain PO.
func (s *PSlice) Add(addr swarm.Address, po uint8) {
s.Lock()
defer s.Unlock()
if e, _ := s.exists(addr); e {
return
}
head := s.peers[:s.bins[po]]
tail := append([]swarm.Address{addr}, s.peers[s.bins[po]:]...)
s.peers = append(head, tail...)
s.incDeeper(po)
}
// Remove a peer at a certain PO.
func (s *PSlice) Remove(addr swarm.Address, po uint8) {
s.Lock()
defer s.Unlock()
e, i := s.exists(addr)
if !e {
return
}
s.peers = append(s.peers[:i], s.peers[i+1:]...)
s.decDeeper(po)
}
// incDeeper increments the peers slice bin index for proximity order > po.
// Must be called under lock.
func (s *PSlice) incDeeper(po uint8) {
if po > uint8(len(s.bins)) {
panic("po too high")
}
for i := po + 1; i < uint8(len(s.bins)); i++ {
// don't increment if the value in k.bins == len(k.peers)
// otherwise the calling context gets an out of bound error
// when accessing the slice
if s.bins[i] < uint(len(s.peers)) {
s.bins[i]++
}
}
}
// decDeeper decrements the peers slice bin indexes for proximity order > po.
// Must be called under lock.
func (s *PSlice) decDeeper(po uint8) {
if po > uint8(len(s.bins)) {
panic("po too high")
}
for i := po + 1; i < uint8(len(s.bins)); i++ {
s.bins[i]--
}
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pslice_test
import (
"errors"
"testing"
"github.com/ethersphere/bee/pkg/kademlia/pslice"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology/test"
)
// TestShallowestEmpty tests that ShallowestEmpty functionality works correctly.
func TestShallowestEmpty(t *testing.T) {
var (
ps = pslice.New(16)
base = test.RandomAddress()
peers = make([][]swarm.Address, 16)
)
for i := 0; i < 16; i++ {
for j := 0; j < 3; j++ {
a := test.RandomAddressAt(base, i)
peers[i] = append(peers[i], a)
}
}
for i, v := range peers {
for _, vv := range v {
ps.Add(vv, uint8(i))
}
sd, none := ps.ShallowestEmpty()
if i == 15 {
if !none {
t.Fatal("expected last bin to be empty, thus return no empty bins true")
}
} else {
if sd != uint8(i+1) {
t.Fatalf("expected shallow empty bin to be %d but got %d", i+1, sd)
}
if none {
t.Fatal("got no empty bins but wanted some")
}
}
}
// this part removes peers in certain bins and asserts
// that the shallowest empty bin behaves correctly once the bins
for _, tc := range []struct {
removePo int
expectShallowest uint8
}{
{
removePo: 3,
expectShallowest: 3,
}, {
removePo: 1,
expectShallowest: 1,
}, {
removePo: 10,
expectShallowest: 1,
}, {
removePo: 15,
expectShallowest: 1,
}, {
removePo: 14,
expectShallowest: 1,
}, {
removePo: 0,
expectShallowest: 0,
},
} {
for _, v := range peers[tc.removePo] {
po := uint8(swarm.Proximity(base.Bytes(), v.Bytes()))
ps.Remove(v, po)
}
sd, none := ps.ShallowestEmpty()
if sd != tc.expectShallowest || none {
t.Fatalf("empty bin mismatch got %d want %d", sd, tc.expectShallowest)
}
}
ps.Add(peers[0][0], 0)
if sd, none := ps.ShallowestEmpty(); sd != 1 || none {
t.Fatalf("expected bin 1 to be empty shallowest but got %d", sd)
}
}
// TestAddRemove checks that the Add, Remove and Exists methods work as expected.
func TestAddRemove(t *testing.T) {
var (
ps = pslice.New(4)
base = test.RandomAddress()
peers = make([]swarm.Address, 8)
)
// 2 peers per bin
// indexes {0,1} {2,3} {4,5} {6,7}
for i := 0; i < 8; i += 2 {
a := test.RandomAddressAt(base, i)
peers[i] = a
b := test.RandomAddressAt(base, i)
peers[i+1] = b
}
// add one
ps.Add(peers[0], 0)
chkLen(t, ps, 1)
chkExists(t, ps, peers[:1]...)
chkNotExists(t, ps, peers[1:]...)
// check duplicates
ps.Add(peers[0], 0)
chkLen(t, ps, 1)
chkBins(t, ps, []uint{0, 1, 1, 1})
chkExists(t, ps, peers[:1]...)
chkNotExists(t, ps, peers[1:]...)
// check empty
ps.Remove(peers[0], 0)
chkLen(t, ps, 0)
chkBins(t, ps, []uint{0, 0, 0, 0})
chkNotExists(t, ps, peers...)
// add two in bin 0
ps.Add(peers[0], 0)
ps.Add(peers[1], 0)
chkLen(t, ps, 2)
chkBins(t, ps, []uint{0, 2, 2, 2})
chkExists(t, ps, peers[:2]...)
chkNotExists(t, ps, peers[2:]...)
ps.Add(peers[2], 1)
ps.Add(peers[3], 1)
chkLen(t, ps, 4)
chkBins(t, ps, []uint{0, 2, 4, 4})
chkExists(t, ps, peers[:4]...)
chkNotExists(t, ps, peers[4:]...)
ps.Remove(peers[1], 0)
chkLen(t, ps, 3)
chkBins(t, ps, []uint{0, 1, 3, 3})
chkExists(t, ps, peers[0], peers[2], peers[3])
chkNotExists(t, ps, append([]swarm.Address{peers[1]}, peers[4:]...)...)
// this should not move the last cursor
ps.Add(peers[7], 3)
chkLen(t, ps, 4)
chkBins(t, ps, []uint{0, 1, 3, 3})
chkExists(t, ps, peers[0], peers[2], peers[3], peers[7])
chkNotExists(t, ps, append([]swarm.Address{peers[1]}, peers[4:7]...)...)
ps.Add(peers[5], 2)
chkLen(t, ps, 5)
chkBins(t, ps, []uint{0, 1, 3, 4})
chkExists(t, ps, peers[0], peers[2], peers[3], peers[5], peers[7])
chkNotExists(t, ps, []swarm.Address{peers[1], peers[4], peers[6]}...)
ps.Remove(peers[2], 1)
chkLen(t, ps, 4)
chkBins(t, ps, []uint{0, 1, 2, 3})
chkExists(t, ps, peers[0], peers[3], peers[5], peers[7])
chkNotExists(t, ps, []swarm.Address{peers[1], peers[2], peers[4], peers[6]}...)
p := uint8(0)
for i := 0; i < 8; i += 2 {
ps.Remove(peers[i], p)
ps.Remove(peers[i+1], p)
p++
}
// check empty again
chkLen(t, ps, 0)
chkBins(t, ps, []uint{0, 0, 0, 0})
chkNotExists(t, ps, peers...)
}
// TestIteratorError checks that error propagation works correctly in the iterators.
func TestIteratorError(t *testing.T) {
var (
ps = pslice.New(4)
base = test.RandomAddress()
a = test.RandomAddressAt(base, 0)
e = errors.New("err1")
)
ps.Add(a, 0)
f := func(p swarm.Address, _ uint8) (stop, jumpToNext bool, err error) {
return false, false, e
}
err := ps.EachBin(f)
if !errors.Is(err, e) {
t.Fatal("didnt get expected error")
}
}
// TestIterators tests that the EachBin and EachBinRev iterators work as expected.
func TestIterators(t *testing.T) {
ps := pslice.New(4)
base := test.RandomAddress()
peers := make([]swarm.Address, 4)
for i := 0; i < 4; i++ {
a := test.RandomAddressAt(base, i)
peers[i] = a
}
testIterator(t, ps, false, false, 0, []swarm.Address{})
testIteratorRev(t, ps, false, false, 0, []swarm.Address{})
for i, v := range peers {
ps.Add(v, uint8(i))
}
testIterator(t, ps, false, false, 4, []swarm.Address{peers[3], peers[2], peers[1], peers[0]})
testIteratorRev(t, ps, false, false, 4, peers)
ps.Remove(peers[2], 2)
testIterator(t, ps, false, false, 3, []swarm.Address{peers[3], peers[1], peers[0]})
testIteratorRev(t, ps, false, false, 3, []swarm.Address{peers[0], peers[1], peers[3]})
ps.Remove(peers[0], 0)
testIterator(t, ps, false, false, 2, []swarm.Address{peers[3], peers[1]})
testIteratorRev(t, ps, false, false, 2, []swarm.Address{peers[1], peers[3]})
ps.Remove(peers[3], 3)
testIterator(t, ps, false, false, 1, []swarm.Address{peers[1]})
testIteratorRev(t, ps, false, false, 1, []swarm.Address{peers[1]})
ps.Remove(peers[1], 1)
testIterator(t, ps, false, false, 0, []swarm.Address{})
testIteratorRev(t, ps, false, false, 0, []swarm.Address{})
}
// TestIteratorsJumpStop tests that the EachBin and EachBinRev iterators jump to next bin and stop as expected.
func TestIteratorsJumpStop(t *testing.T) {
ps := pslice.New(4)
base := test.RandomAddress()
peers := make([]swarm.Address, 12)
j := 0
for i := 0; i < 4; i++ {
for ii := 0; ii < 3; ii++ {
a := test.RandomAddressAt(base, i)
peers[j] = a
ps.Add(a, uint8(i))
j++
}
}
// check that jump to next bin works as expected
testIterator(t, ps, true, false, 4, []swarm.Address{peers[11], peers[8], peers[5], peers[2]})
testIteratorRev(t, ps, true, false, 4, []swarm.Address{peers[2], peers[5], peers[8], peers[11]})
// check that the stop functionality works correctly
testIterator(t, ps, true, true, 1, []swarm.Address{peers[11]})
testIteratorRev(t, ps, true, true, 1, []swarm.Address{peers[2]})
}
func testIteratorRev(t *testing.T, ps *pslice.PSlice, skipNext, stop bool, iterations int, peerseq []swarm.Address) {
t.Helper()
i := 0
f := func(p swarm.Address, po uint8) (bool, bool, error) {
if i == iterations {
t.Fatal("too many iterations!")
}
if !p.Equal(peerseq[i]) {
t.Errorf("got wrong peer seq from iterator")
}
i++
return stop, skipNext, nil
}
err := ps.EachBinRev(f)
if err != nil {
t.Fatal(err)
}
}
func testIterator(t *testing.T, ps *pslice.PSlice, skipNext, stop bool, iterations int, peerseq []swarm.Address) {
t.Helper()
i := 0
f := func(p swarm.Address, po uint8) (bool, bool, error) {
if i == iterations {
t.Fatal("too many iterations!")
}
if !p.Equal(peerseq[i]) {
t.Errorf("got wrong peer seq from iterator")
}
i++
return stop, skipNext, nil
}
err := ps.EachBin(f)
if err != nil {
t.Fatal(err)
}
}
func chkLen(t *testing.T, ps *pslice.PSlice, l int) {
pp := pslice.PSlicePeers(ps)
if lp := len(pp); lp != l {
t.Fatalf("length mismatch, want %d got %d", l, lp)
}
}
func chkBins(t *testing.T, ps *pslice.PSlice, seq []uint) {
pb := pslice.PSliceBins(ps)
for i, v := range seq {
if pb[i] != v {
t.Fatalf("bin seq wrong, get %d want %d, index %v", pb[i], v, pb)
}
}
}
func chkExists(t *testing.T, ps *pslice.PSlice, addrs ...swarm.Address) {
t.Helper()
for _, a := range addrs {
if !ps.Exists(a) {
t.Fatalf("peer %s does not exist but should have", a.String())
}
}
}
func chkNotExists(t *testing.T, ps *pslice.PSlice, addrs ...swarm.Address) {
t.Helper()
for _, a := range addrs {
if ps.Exists(a) {
t.Fatalf("peer %s does exists but should have not", a.String())
}
}
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package test
import (
"fmt"
"math/rand"
"github.com/ethersphere/bee/pkg/swarm"
)
// RandomAddressAt generates a random address
// at proximity order prox relative to address.
func RandomAddressAt(self swarm.Address, prox int) swarm.Address {
addr := make([]byte, len(self.Bytes()))
copy(addr, self.Bytes())
pos := -1
if prox >= 0 {
pos = prox / 8
trans := prox % 8
transbytea := byte(0)
for j := 0; j <= trans; j++ {
transbytea |= 1 << uint8(7-j)
}
flipbyte := byte(1 << uint8(7-trans))
transbyteb := transbytea ^ byte(255)
randbyte := byte(rand.Intn(255))
addr[pos] = ((addr[pos] & transbytea) ^ flipbyte) | randbyte&transbyteb
}
for i := pos + 1; i < len(addr); i++ {
addr[i] = byte(rand.Intn(255))
}
a := swarm.NewAddress(addr)
if a.Equal(self) {
panic(fmt.Sprint(a.String(), self.String()))
}
return a
}
// RandomAddress generates a random address.
func RandomAddress() swarm.Address {
b := make([]byte, 32)
return RandomAddressAt(swarm.NewAddress(b), -1)
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package test_test
import (
"encoding/binary"
"testing"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology/test"
)
// TestRandomAddressAt checks that RandomAddressAt generates a correct random address
// at a given proximity order. It compares the number of leading equal bits in the generated
// address to the base address.
func TestRandomAddressAt(t *testing.T) {
base := swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c")
b0 := base.Bytes()
hw0 := []byte{b0[0], b0[1], 0, 0} // highest words of base address
hw0int := binary.BigEndian.Uint32(hw0)
for bitsInCommon := 0; bitsInCommon < 30; bitsInCommon++ {
addr := test.RandomAddressAt(base, bitsInCommon)
b1 := addr.Bytes()
hw1 := []byte{b1[0], b1[1], 0, 0} // highest words of 1
hw1int := binary.BigEndian.Uint32(hw1)
//bb0 is the bit mask to AND with hw0 and hw1
bb0 := uint32(0)
for i := 0; i < bitsInCommon; i++ {
bb0 |= (1 << (31 - i))
}
andhw0 := hw0int & bb0
andhw1 := hw1int & bb0
// the result of the AND with both highest words of b0 and b1 should be equal
if andhw0 != andhw1 {
t.Fatalf("hw0 %08b hw1 %08b mask %08b &0 %08b &1 %08b", hw0int, hw1int, bb0, andhw0, andhw1)
}
}
}
......@@ -28,3 +28,6 @@ type PeerAdder interface {
type ClosestPeerer interface {
ClosestPeer(addr swarm.Address) (peerAddr swarm.Address, err error)
}
// EachPeerFunc is a callback that is called with a peer and its PO
type EachPeerFunc func(swarm.Address, uint8) (stop, jumpToNext bool, err error)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment