Commit 4eeb6fee authored by Petar Radovic's avatar Petar Radovic Committed by GitHub

Hive with signature + use Bzz address accross Bee (#227)

* Use bzz address accross bee

* Hive with signature
parent 36b3c5f3
......@@ -5,14 +5,12 @@
package addressbook
import (
"encoding/json"
"fmt"
"strings"
"github.com/ethersphere/bee/pkg/bzz"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
ma "github.com/multiformats/go-multiaddr"
)
const keyPrefix = "addressbook_entry_"
......@@ -22,7 +20,7 @@ var _ Interface = (*store)(nil)
type Interface interface {
GetPutter
Overlays() ([]swarm.Address, error)
Multiaddresses() ([]ma.Multiaddr, error)
Addresses() ([]bzz.Address, error)
}
type GetPutter interface {
......@@ -31,11 +29,11 @@ type GetPutter interface {
}
type Getter interface {
Get(overlay swarm.Address) (addr ma.Multiaddr, err error)
Get(overlay swarm.Address) (addr bzz.Address, err error)
}
type Putter interface {
Put(overlay swarm.Address, addr ma.Multiaddr) (err error)
Put(overlay swarm.Address, addr bzz.Address) (err error)
}
type store struct {
......@@ -48,22 +46,19 @@ func New(storer storage.StateStorer) Interface {
}
}
func (s *store) Get(overlay swarm.Address) (ma.Multiaddr, error) {
func (s *store) Get(overlay swarm.Address) (bzz.Address, error) {
key := keyPrefix + overlay.String()
v := PeerEntry{}
v := bzz.Address{}
err := s.store.Get(key, &v)
if err != nil {
return nil, err
return bzz.Address{}, err
}
return v.Multiaddr, nil
return v, nil
}
func (s *store) Put(overlay swarm.Address, addr ma.Multiaddr) (err error) {
func (s *store) Put(overlay swarm.Address, addr bzz.Address) (err error) {
key := keyPrefix + overlay.String()
pe := &PeerEntry{Overlay: overlay, Multiaddr: addr}
return s.store.Put(key, pe)
return s.store.Put(key, &addr)
}
func (s *store) Overlays() (overlays []swarm.Address, err error) {
......@@ -90,63 +85,20 @@ func (s *store) Overlays() (overlays []swarm.Address, err error) {
return overlays, nil
}
func (s *store) Multiaddresses() (multis []ma.Multiaddr, err error) {
func (s *store) Addresses() (addresses []bzz.Address, err error) {
err = s.store.Iterate(keyPrefix, func(_, value []byte) (stop bool, err error) {
entry := &PeerEntry{}
entry := &bzz.Address{}
err = entry.UnmarshalJSON(value)
if err != nil {
return true, err
}
multis = append(multis, entry.Multiaddr)
addresses = append(addresses, *entry)
return false, nil
})
if err != nil {
return nil, err
}
return multis, nil
}
type PeerEntry struct {
Overlay swarm.Address
Multiaddr ma.Multiaddr
}
func (p *PeerEntry) MarshalJSON() ([]byte, error) {
v := struct {
Overlay string
Multiaddr string
}{
Overlay: p.Overlay.String(),
Multiaddr: p.Multiaddr.String(),
}
return json.Marshal(&v)
}
func (p *PeerEntry) UnmarshalJSON(b []byte) error {
v := struct {
Overlay string
Multiaddr string
}{}
err := json.Unmarshal(b, &v)
if err != nil {
return err
}
a, err := swarm.ParseHexAddress(v.Overlay)
if err != nil {
return err
}
p.Overlay = a
m, err := ma.NewMultiaddr(v.Multiaddr)
if err != nil {
return err
}
p.Multiaddr = m
return nil
return addresses, nil
}
......@@ -8,16 +8,18 @@ import (
"testing"
"github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/bzz"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/swarm"
ma "github.com/multiformats/go-multiaddr"
)
type bookFunc func(t *testing.T) (book addressbook.GetPutter)
type bookFunc func(t *testing.T) (book addressbook.Interface)
func TestInMem(t *testing.T) {
run(t, func(t *testing.T) addressbook.GetPutter {
run(t, func(t *testing.T) addressbook.Interface {
store := mock.NewStateStore()
book := addressbook.New(store)
......@@ -27,7 +29,6 @@ func TestInMem(t *testing.T) {
func run(t *testing.T, f bookFunc) {
store := f(t)
addr1 := swarm.NewAddress([]byte{0, 1, 2, 3})
addr2 := swarm.NewAddress([]byte{0, 1, 2, 4})
multiaddr, err := ma.NewMultiaddr("/ip4/1.1.1.1")
......@@ -35,7 +36,17 @@ func run(t *testing.T, f bookFunc) {
t.Fatal(err)
}
err = store.Put(addr1, multiaddr)
pk, err := crypto.GenerateSecp256k1Key()
if err != nil {
t.Fatal(err)
}
bzzAddr, err := bzz.NewAddress(crypto.NewDefaultSigner(pk), multiaddr, addr1, 1)
if err != nil {
t.Fatal(err)
}
err = store.Put(addr1, *bzzAddr)
if err != nil {
t.Fatal(err)
}
......@@ -50,7 +61,25 @@ func run(t *testing.T, f bookFunc) {
t.Fatal("value found in store but should not have been")
}
if multiaddr.String() != v.String() {
if !bzzAddr.Equal(&v) {
t.Fatalf("value retrieved from store not equal to original stored address: %v, want %v", v, multiaddr)
}
overlays, err := store.Overlays()
if err != nil {
t.Fatal(err)
}
if len(overlays) != 1 {
t.Fatalf("expected overlay len %v, got %v", 1, len(overlays))
}
addresses, err := store.Addresses()
if err != nil {
t.Fatal(err)
}
if len(addresses) != 1 {
t.Fatalf("expected addresses len %v, got %v", 1, len(addresses))
}
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package bzz
import (
"bytes"
"encoding/base64"
"encoding/binary"
"encoding/json"
"errors"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/swarm"
ma "github.com/multiformats/go-multiaddr"
)
var ErrInvalidAddress = errors.New("invalid address")
// Address represents the bzz address in swarm.
// It consists of a peers underlay (physical) address, overlay (topology) address and signature.
// Signature is used to verify the `Overlay/Underlay` pair, as it is based on `underlay|networkID`, signed with the public key of Overlay address
type Address struct {
Underlay ma.Multiaddr
Overlay swarm.Address
Signature []byte
}
type addressJSON struct {
Overlay string `json:"overlay"`
Underlay string `json:"underlay"`
Signature string `json:"signature"`
}
func NewAddress(signer crypto.Signer, underlay ma.Multiaddr, overlay swarm.Address, networkID uint64) (*Address, error) {
networkIDBytes := make([]byte, 8)
binary.BigEndian.PutUint64(networkIDBytes, networkID)
signature, err := signer.Sign(append(underlay.Bytes(), networkIDBytes...))
if err != nil {
return nil, err
}
return &Address{
Underlay: underlay,
Overlay: overlay,
Signature: signature,
}, nil
}
func ParseAddress(underlay, overlay, signature []byte, networkID uint64) (*Address, error) {
networkIDBytes := make([]byte, 8)
binary.BigEndian.PutUint64(networkIDBytes, networkID)
recoveredPK, err := crypto.Recover(signature, append(underlay, networkIDBytes...))
if err != nil {
return nil, ErrInvalidAddress
}
recoveredOverlay := crypto.NewOverlayAddress(*recoveredPK, networkID)
if !bytes.Equal(recoveredOverlay.Bytes(), overlay) {
return nil, ErrInvalidAddress
}
multiUnderlay, err := ma.NewMultiaddrBytes(underlay)
if err != nil {
return nil, ErrInvalidAddress
}
return &Address{
Underlay: multiUnderlay,
Overlay: swarm.NewAddress(overlay),
Signature: signature,
}, nil
}
func (a *Address) Equal(b *Address) bool {
return a.Overlay.Equal(b.Overlay) && a.Underlay.Equal(b.Underlay) && bytes.Equal(a.Signature, b.Signature)
}
func (p *Address) MarshalJSON() ([]byte, error) {
return json.Marshal(&addressJSON{
Overlay: p.Overlay.String(),
Underlay: p.Underlay.String(),
Signature: base64.StdEncoding.EncodeToString(p.Signature),
})
}
func (p *Address) UnmarshalJSON(b []byte) error {
v := &addressJSON{}
err := json.Unmarshal(b, v)
if err != nil {
return err
}
a, err := swarm.ParseHexAddress(v.Overlay)
if err != nil {
return err
}
p.Overlay = a
m, err := ma.NewMultiaddr(v.Underlay)
if err != nil {
return err
}
p.Underlay = m
p.Signature, err = base64.StdEncoding.DecodeString(v.Signature)
return err
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package bzz_test
import (
"testing"
"github.com/ethersphere/bee/pkg/bzz"
"github.com/ethersphere/bee/pkg/crypto"
ma "github.com/multiformats/go-multiaddr"
)
func TestBzzAddress(t *testing.T) {
node1ma, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/7070/p2p/16Uiu2HAkx8ULY8cTXhdVAcMmLcH9AsTKz6uBQ7DPLKRjMLgBVYkA")
if err != nil {
t.Fatal(err)
}
privateKey1, err := crypto.GenerateSecp256k1Key()
if err != nil {
t.Fatal(err)
}
overlay := crypto.NewOverlayAddress(privateKey1.PublicKey, 3)
signer1 := crypto.NewDefaultSigner(privateKey1)
bzzAddress, err := bzz.NewAddress(signer1, node1ma, overlay, 3)
if err != nil {
t.Fatal(err)
}
bzzAddress2, err := bzz.ParseAddress(node1ma.Bytes(), overlay.Bytes(), bzzAddress.Signature, 3)
if err != nil {
t.Fatal(err)
}
if !bzzAddress.Equal(bzzAddress2) {
t.Fatalf("got %s expected %s", bzzAddress2, bzzAddress)
}
bytes, err := bzzAddress.MarshalJSON()
if err != nil {
t.Fatal(err)
}
var newbzz bzz.Address
if err := newbzz.UnmarshalJSON(bytes); err != nil {
t.Fatal(err)
}
if !newbzz.Equal(bzzAddress) {
t.Fatalf("got %s expected %s", newbzz, bzzAddress)
}
}
......@@ -48,8 +48,8 @@ func newTestServer(t *testing.T, o testServerOptions) *testServer {
P2P: o.P2P,
Logger: logging.New(ioutil.Discard, 0),
Addressbook: addrbook,
TopologyDriver: topologyDriver,
Storer: o.Storer,
TopologyDriver: topologyDriver,
})
ts := httptest.NewServer(s)
t.Cleanup(ts.Close)
......@@ -65,9 +65,8 @@ func newTestServer(t *testing.T, o testServerOptions) *testServer {
}),
}
return &testServer{
Client: client,
Addressbook: addrbook,
TopologyDriver: topologyDriver,
Client: client,
Addressbook: addrbook,
}
}
......
......@@ -35,21 +35,6 @@ func (s *server) peerConnectHandler(w http.ResponseWriter, r *http.Request) {
return
}
err = s.Addressbook.Put(address, addr)
if err != nil {
s.Logger.Debugf("debug api: addressbook.put %s: %v", addr, err)
s.Logger.Errorf("unable to persist peer %s", addr)
jsonhttp.InternalServerError(w, err)
return
}
if err := s.TopologyDriver.Connected(r.Context(), address); err != nil {
_ = s.P2P.Disconnect(address)
s.Logger.Debugf("debug api: topologyDriver.Connected %s: %v", addr, err)
s.Logger.Errorf("unable to connect to peer %s", addr)
jsonhttp.InternalServerError(w, err)
return
}
jsonhttp.OK(w, peerConnectResponse{
Address: address.String(),
})
......
......@@ -15,9 +15,7 @@ import (
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/mock"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
topmock "github.com/ethersphere/bee/pkg/topology/mock"
ma "github.com/multiformats/go-multiaddr"
)
......@@ -40,11 +38,6 @@ func TestConnect(t *testing.T) {
jsonhttptest.ResponseDirect(t, testServer.Client, http.MethodPost, "/connect"+underlay, nil, http.StatusOK, debugapi.PeerConnectResponse{
Address: overlay.String(),
})
multia, err := testServer.Addressbook.Get(overlay)
if err != nil && errors.Is(err, storage.ErrNotFound) && underlay != multia.String() {
t.Fatalf("found wrong underlay. expected: %s, found: %s", underlay, multia.String())
}
})
t.Run("error", func(t *testing.T) {
......@@ -60,36 +53,6 @@ func TestConnect(t *testing.T) {
Message: http.StatusText(http.StatusMethodNotAllowed),
})
})
t.Run("error - add peer", func(t *testing.T) {
disconnectCalled := false
testServer := newTestServer(t, testServerOptions{
P2P: mock.New(mock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (swarm.Address, error) {
if addr.String() == errorUnderlay {
return swarm.Address{}, testErr
}
return overlay, nil
}), mock.WithDisconnectFunc(func(addr swarm.Address) error {
disconnectCalled = true
return nil
})),
TopologyOpts: []topmock.Option{topmock.WithAddPeerErr(testErr)},
})
jsonhttptest.ResponseDirect(t, testServer.Client, http.MethodPost, "/connect"+underlay, nil, http.StatusInternalServerError, jsonhttp.StatusResponse{
Code: http.StatusInternalServerError,
Message: testErr.Error(),
})
multia, err := testServer.Addressbook.Get(overlay)
if err != nil && errors.Is(err, storage.ErrNotFound) && underlay != multia.String() {
t.Fatalf("found wrong underlay. expected: %s, found: %s", underlay, multia.String())
}
if !disconnectCalled {
t.Fatalf("disconnect not called.")
}
})
}
func TestDisconnect(t *testing.T) {
......@@ -143,7 +106,6 @@ func TestDisconnect(t *testing.T) {
func TestPeer(t *testing.T) {
overlay := swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c")
testServer := newTestServer(t, testServerOptions{
P2P: mock.New(mock.WithPeersFunc(func() []p2p.Peer {
return []p2p.Peer{{Address: overlay}}
......
......@@ -11,14 +11,13 @@ import (
"time"
"github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/bzz"
"github.com/ethersphere/bee/pkg/hive/pb"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
ma "github.com/multiformats/go-multiaddr"
)
const (
......@@ -26,19 +25,21 @@ const (
protocolVersion = "1.0.0"
peersStreamName = "peers"
messageTimeout = 1 * time.Minute // maximum allowed time for a message to be read or written.
maxBatchSize = 50
maxBatchSize = 30
)
type Service struct {
streamer p2p.Streamer
addressBook addressbook.GetPutter
peerHandler func(context.Context, swarm.Address) error
networkID uint64
logger logging.Logger
}
type Options struct {
Streamer p2p.Streamer
AddressBook addressbook.GetPutter
NetworkID uint64
Logger logging.Logger
}
......@@ -47,6 +48,7 @@ func New(o Options) *Service {
streamer: o.Streamer,
logger: o.Logger,
addressBook: o.AddressBook,
networkID: o.NetworkID,
}
}
......@@ -96,15 +98,16 @@ func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swa
addr, err := s.addressBook.Get(p)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
s.logger.Debugf("Peer not found %s", peer, err)
s.logger.Debugf("Peer not found %s", p)
continue
}
return err
}
peersRequest.Peers = append(peersRequest.Peers, &pb.BzzAddress{
Overlay: p.Bytes(),
Underlay: addr.String(),
Overlay: addr.Overlay.Bytes(),
Underlay: addr.Underlay.Bytes(),
Signature: addr.Signature,
})
}
......@@ -127,18 +130,20 @@ func (s *Service) peersHandler(_ context.Context, peer p2p.Peer, stream p2p.Stre
return fmt.Errorf("close stream: %w", err)
}
for _, newPeer := range peersReq.Peers {
addr, err := ma.NewMultiaddr(newPeer.Underlay)
bzzAddress, err := bzz.ParseAddress(newPeer.Underlay, newPeer.Overlay, newPeer.Signature, s.networkID)
if err != nil {
s.logger.Infof("Skipping peer in response %s: %w", newPeer, err)
s.logger.Warningf("skipping peer in response %s: %w", newPeer, err)
continue
}
err = s.addressBook.Put(swarm.NewAddress(newPeer.Overlay), addr)
err = s.addressBook.Put(bzzAddress.Overlay, *bzzAddress)
if err != nil {
return err
s.logger.Warningf("skipping peer in response %s: %w", newPeer, err)
continue
}
if s.peerHandler != nil {
if err := s.peerHandler(context.Background(), swarm.NewAddress(newPeer.Overlay)); err != nil {
if err := s.peerHandler(context.Background(), bzzAddress.Overlay); err != nil {
return err
}
}
......
......@@ -19,6 +19,8 @@ import (
ma "github.com/multiformats/go-multiaddr"
ab "github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/bzz"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/hive"
"github.com/ethersphere/bee/pkg/hive/pb"
"github.com/ethersphere/bee/pkg/logging"
......@@ -33,11 +35,12 @@ func TestBroadcastPeers(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
statestore := mock.NewStateStore()
addressbook := ab.New(statestore)
networkID := uint64(1)
// populate all expected and needed random resources for 2 full batches
// tests cases that uses fewer resources can use sub-slices of this data
var multiaddrs []ma.Multiaddr
var addrs []swarm.Address
var bzzAddresses []bzz.Address
var overlays []swarm.Address
var wantMsgs []pb.Peers
for i := 0; i < 2; i++ {
......@@ -45,61 +48,71 @@ func TestBroadcastPeers(t *testing.T) {
}
for i := 0; i < 2*hive.MaxBatchSize; i++ {
ma, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/" + strconv.Itoa(i))
underlay, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/" + strconv.Itoa(i))
if err != nil {
t.Fatal(err)
}
pk, err := crypto.GenerateSecp256k1Key()
if err != nil {
t.Fatal(err)
}
signer := crypto.NewDefaultSigner(pk)
overlay := crypto.NewOverlayAddress(pk.PublicKey, networkID)
bzzAddr, err := bzz.NewAddress(signer, underlay, overlay, networkID)
if err != nil {
t.Fatal(err)
}
multiaddrs = append(multiaddrs, ma)
addrs = append(addrs, swarm.NewAddress(createRandomBytes()))
err = addressbook.Put(addrs[i], multiaddrs[i])
bzzAddresses = append(bzzAddresses, *bzzAddr)
overlays = append(overlays, bzzAddr.Overlay)
err = addressbook.Put(bzzAddr.Overlay, *bzzAddr)
if err != nil {
t.Fatal(err)
}
wantMsgs[i/hive.MaxBatchSize].Peers = append(wantMsgs[i/hive.MaxBatchSize].Peers, &pb.BzzAddress{Overlay: addrs[i].Bytes(), Underlay: multiaddrs[i].String()})
wantMsgs[i/hive.MaxBatchSize].Peers = append(wantMsgs[i/hive.MaxBatchSize].Peers, &pb.BzzAddress{Overlay: bzzAddresses[i].Overlay.Bytes(), Underlay: bzzAddresses[i].Underlay.Bytes(), Signature: bzzAddresses[i].Signature})
}
testCases := map[string]struct {
addresee swarm.Address
peers []swarm.Address
wantMsgs []pb.Peers
wantOverlays []swarm.Address
wantMultiAddresses []ma.Multiaddr
addresee swarm.Address
peers []swarm.Address
wantMsgs []pb.Peers
wantOverlays []swarm.Address
wantBzzAddresses []bzz.Address
}{
"OK - single record": {
addresee: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c"),
peers: []swarm.Address{addrs[0]},
wantMsgs: []pb.Peers{{Peers: wantMsgs[0].Peers[:1]}},
wantOverlays: []swarm.Address{addrs[0]},
wantMultiAddresses: []ma.Multiaddr{multiaddrs[0]},
addresee: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c"),
peers: []swarm.Address{overlays[0]},
wantMsgs: []pb.Peers{{Peers: wantMsgs[0].Peers[:1]}},
wantOverlays: []swarm.Address{overlays[0]},
wantBzzAddresses: []bzz.Address{bzzAddresses[0]},
},
"OK - single batch - multiple records": {
addresee: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c"),
peers: addrs[:15],
wantMsgs: []pb.Peers{{Peers: wantMsgs[0].Peers[:15]}},
wantOverlays: addrs[:15],
wantMultiAddresses: multiaddrs[:15],
addresee: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c"),
peers: overlays[:15],
wantMsgs: []pb.Peers{{Peers: wantMsgs[0].Peers[:15]}},
wantOverlays: overlays[:15],
wantBzzAddresses: bzzAddresses[:15],
},
"OK - single batch - max number of records": {
addresee: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c"),
peers: addrs[:hive.MaxBatchSize],
wantMsgs: []pb.Peers{{Peers: wantMsgs[0].Peers[:hive.MaxBatchSize]}},
wantOverlays: addrs[:hive.MaxBatchSize],
wantMultiAddresses: multiaddrs[:hive.MaxBatchSize],
addresee: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c"),
peers: overlays[:hive.MaxBatchSize],
wantMsgs: []pb.Peers{{Peers: wantMsgs[0].Peers[:hive.MaxBatchSize]}},
wantOverlays: overlays[:hive.MaxBatchSize],
wantBzzAddresses: bzzAddresses[:hive.MaxBatchSize],
},
"OK - multiple batches": {
addresee: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c"),
peers: addrs[:hive.MaxBatchSize+10],
wantMsgs: []pb.Peers{{Peers: wantMsgs[0].Peers}, {Peers: wantMsgs[1].Peers[:10]}},
wantOverlays: addrs[:hive.MaxBatchSize+10],
wantMultiAddresses: multiaddrs[:hive.MaxBatchSize+10],
addresee: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c"),
peers: overlays[:hive.MaxBatchSize+10],
wantMsgs: []pb.Peers{{Peers: wantMsgs[0].Peers}, {Peers: wantMsgs[1].Peers[:10]}},
wantOverlays: overlays[:hive.MaxBatchSize+10],
wantBzzAddresses: bzzAddresses[:hive.MaxBatchSize+10],
},
"OK - multiple batches - max number of records": {
addresee: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c"),
peers: addrs[:2*hive.MaxBatchSize],
wantMsgs: []pb.Peers{{Peers: wantMsgs[0].Peers}, {Peers: wantMsgs[1].Peers}},
wantOverlays: addrs[:2*hive.MaxBatchSize],
wantMultiAddresses: multiaddrs[:2*hive.MaxBatchSize],
addresee: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c"),
peers: overlays[:2*hive.MaxBatchSize],
wantMsgs: []pb.Peers{{Peers: wantMsgs[0].Peers}, {Peers: wantMsgs[1].Peers}},
wantOverlays: overlays[:2*hive.MaxBatchSize],
wantBzzAddresses: bzzAddresses[:2*hive.MaxBatchSize],
},
}
......@@ -111,6 +124,7 @@ func TestBroadcastPeers(t *testing.T) {
server := hive.New(hive.Options{
Logger: logger,
AddressBook: addressbookclean,
NetworkID: networkID,
})
// setup the stream recorder to record stream data
......@@ -123,6 +137,7 @@ func TestBroadcastPeers(t *testing.T) {
Streamer: recorder,
Logger: logger,
AddressBook: addressbook,
NetworkID: networkID,
})
if err := client.BroadcastPeers(context.Background(), tc.addresee, tc.peers...); err != nil {
......@@ -151,7 +166,7 @@ func TestBroadcastPeers(t *testing.T) {
}
expectOverlaysEventually(t, addressbookclean, tc.wantOverlays)
expectMultiaddresessEventually(t, addressbookclean, tc.wantMultiAddresses)
expectBzzAddresessEventually(t, addressbookclean, tc.wantBzzAddresses)
})
}
}
......@@ -189,37 +204,33 @@ func expectOverlaysEventually(t *testing.T, exporter ab.Interface, wantOverlays
t.Errorf("Overlays got %v, want %v", o, wantOverlays)
}
func expectMultiaddresessEventually(t *testing.T, exporter ab.Interface, wantMultiaddresses []ma.Multiaddr) {
for i := 0; i < 10; i++ {
var stringMultiaddresses []string
m, err := exporter.Multiaddresses()
func expectBzzAddresessEventually(t *testing.T, exporter ab.Interface, wantBzzAddresses []bzz.Address) {
for i := 0; i < 100; i++ {
time.Sleep(50 * time.Millisecond)
addresses, err := exporter.Addresses()
if err != nil {
t.Fatal(err)
}
for _, v := range m {
stringMultiaddresses = append(stringMultiaddresses, v.String())
}
var stringWantMultiAddresses []string
for _, v := range wantMultiaddresses {
stringWantMultiAddresses = append(stringWantMultiAddresses, v.String())
if len(addresses) != len(wantBzzAddresses) {
continue
}
sort.Strings(stringMultiaddresses)
sort.Strings(stringWantMultiAddresses)
if reflect.DeepEqual(stringMultiaddresses, stringWantMultiAddresses) {
return
for i, v := range addresses {
if !v.Equal(&wantBzzAddresses[i]) {
continue
}
}
time.Sleep(50 * time.Millisecond)
return
}
m, err := exporter.Multiaddresses()
m, err := exporter.Addresses()
if err != nil {
t.Fatal(err)
}
t.Errorf("Multiaddresses got %v, want %v", m, wantMultiaddresses)
t.Errorf("Bzz addresses got %v, want %v", m, wantBzzAddresses)
}
func readAndAssertPeersMsgs(in []byte, expectedLen int) ([]pb.Peers, error) {
......@@ -239,16 +250,9 @@ func readAndAssertPeersMsgs(in []byte, expectedLen int) ([]pb.Peers, error) {
}
var peers []pb.Peers
for _, m := range messages {
peers = append(peers, *m.(*pb.Peers))
}
return peers, nil
}
func createRandomBytes() []byte {
randBytes := make([]byte, 32)
rand.Read(randBytes)
return randBytes
}
......@@ -67,8 +67,9 @@ func (m *Peers) GetPeers() []*BzzAddress {
}
type BzzAddress struct {
Overlay []byte `protobuf:"bytes,1,opt,name=Overlay,proto3" json:"Overlay,omitempty"`
Underlay string `protobuf:"bytes,2,opt,name=Underlay,proto3" json:"Underlay,omitempty"`
Underlay []byte `protobuf:"bytes,1,opt,name=Underlay,proto3" json:"Underlay,omitempty"`
Signature []byte `protobuf:"bytes,2,opt,name=Signature,proto3" json:"Signature,omitempty"`
Overlay []byte `protobuf:"bytes,3,opt,name=Overlay,proto3" json:"Overlay,omitempty"`
}
func (m *BzzAddress) Reset() { *m = BzzAddress{} }
......@@ -104,18 +105,25 @@ func (m *BzzAddress) XXX_DiscardUnknown() {
var xxx_messageInfo_BzzAddress proto.InternalMessageInfo
func (m *BzzAddress) GetOverlay() []byte {
func (m *BzzAddress) GetUnderlay() []byte {
if m != nil {
return m.Overlay
return m.Underlay
}
return nil
}
func (m *BzzAddress) GetUnderlay() string {
func (m *BzzAddress) GetSignature() []byte {
if m != nil {
return m.Underlay
return m.Signature
}
return nil
}
func (m *BzzAddress) GetOverlay() []byte {
if m != nil {
return m.Overlay
}
return ""
return nil
}
func init() {
......@@ -126,17 +134,18 @@ func init() {
func init() { proto.RegisterFile("hive.proto", fileDescriptor_d635d1ead41ba02c) }
var fileDescriptor_d635d1ead41ba02c = []byte{
// 158 bytes of a gzipped FileDescriptorProto
// 174 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xca, 0xc8, 0x2c, 0x4b,
0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x01, 0xb1, 0x95, 0xf4, 0xb9, 0x58, 0x03, 0x52,
0x53, 0x8b, 0x8a, 0x85, 0xd4, 0xb8, 0x58, 0x0b, 0x40, 0x0c, 0x09, 0x46, 0x05, 0x66, 0x0d, 0x6e,
0x23, 0x01, 0x3d, 0xb0, 0x52, 0xa7, 0xaa, 0x2a, 0xc7, 0x94, 0x94, 0xa2, 0xd4, 0xe2, 0xe2, 0x20,
0x88, 0xb4, 0x92, 0x13, 0x17, 0x17, 0x42, 0x50, 0x48, 0x82, 0x8b, 0xdd, 0xbf, 0x2c, 0xb5, 0x28,
0x27, 0xb1, 0x52, 0x82, 0x51, 0x81, 0x51, 0x83, 0x27, 0x08, 0xc6, 0x15, 0x92, 0xe2, 0xe2, 0x08,
0xcd, 0x4b, 0x81, 0x48, 0x31, 0x29, 0x30, 0x6a, 0x70, 0x06, 0xc1, 0xf9, 0x4e, 0x32, 0x27, 0x1e,
0xc9, 0x31, 0x5e, 0x78, 0x24, 0xc7, 0xf8, 0xe0, 0x91, 0x1c, 0xe3, 0x84, 0xc7, 0x72, 0x0c, 0x17,
0x1e, 0xcb, 0x31, 0xdc, 0x78, 0x2c, 0xc7, 0x10, 0xc5, 0x54, 0x90, 0x94, 0xc4, 0x06, 0x76, 0x9f,
0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0xed, 0x0e, 0xc1, 0x96, 0xad, 0x00, 0x00, 0x00,
0x88, 0xb4, 0x52, 0x02, 0x17, 0x17, 0x42, 0x50, 0x48, 0x8a, 0x8b, 0x23, 0x34, 0x2f, 0x25, 0xb5,
0x28, 0x27, 0xb1, 0x52, 0x82, 0x51, 0x81, 0x51, 0x83, 0x27, 0x08, 0xce, 0x17, 0x92, 0xe1, 0xe2,
0x0c, 0xce, 0x4c, 0xcf, 0x4b, 0x2c, 0x29, 0x2d, 0x4a, 0x95, 0x60, 0x02, 0x4b, 0x22, 0x04, 0x84,
0x24, 0xb8, 0xd8, 0xfd, 0xcb, 0x20, 0x1a, 0x99, 0xc1, 0x72, 0x30, 0xae, 0x93, 0xcc, 0x89, 0x47,
0x72, 0x8c, 0x17, 0x1e, 0xc9, 0x31, 0x3e, 0x78, 0x24, 0xc7, 0x38, 0xe1, 0xb1, 0x1c, 0xc3, 0x85,
0xc7, 0x72, 0x0c, 0x37, 0x1e, 0xcb, 0x31, 0x44, 0x31, 0x15, 0x24, 0x25, 0xb1, 0x81, 0x5d, 0x6f,
0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x2e, 0x11, 0xc0, 0xe9, 0xcb, 0x00, 0x00, 0x00,
}
func (m *Peers) Marshal() (dAtA []byte, err error) {
......@@ -196,18 +205,25 @@ func (m *BzzAddress) MarshalToSizedBuffer(dAtA []byte) (int, error) {
_ = i
var l int
_ = l
if len(m.Underlay) > 0 {
i -= len(m.Underlay)
copy(dAtA[i:], m.Underlay)
i = encodeVarintHive(dAtA, i, uint64(len(m.Underlay)))
i--
dAtA[i] = 0x12
}
if len(m.Overlay) > 0 {
i -= len(m.Overlay)
copy(dAtA[i:], m.Overlay)
i = encodeVarintHive(dAtA, i, uint64(len(m.Overlay)))
i--
dAtA[i] = 0x1a
}
if len(m.Signature) > 0 {
i -= len(m.Signature)
copy(dAtA[i:], m.Signature)
i = encodeVarintHive(dAtA, i, uint64(len(m.Signature)))
i--
dAtA[i] = 0x12
}
if len(m.Underlay) > 0 {
i -= len(m.Underlay)
copy(dAtA[i:], m.Underlay)
i = encodeVarintHive(dAtA, i, uint64(len(m.Underlay)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
......@@ -245,11 +261,15 @@ func (m *BzzAddress) Size() (n int) {
}
var l int
_ = l
l = len(m.Overlay)
l = len(m.Underlay)
if l > 0 {
n += 1 + l + sovHive(uint64(l))
}
l = len(m.Underlay)
l = len(m.Signature)
if l > 0 {
n += 1 + l + sovHive(uint64(l))
}
l = len(m.Overlay)
if l > 0 {
n += 1 + l + sovHive(uint64(l))
}
......@@ -380,7 +400,7 @@ func (m *BzzAddress) Unmarshal(dAtA []byte) error {
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Overlay", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field Underlay", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
......@@ -407,16 +427,50 @@ func (m *BzzAddress) Unmarshal(dAtA []byte) error {
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Overlay = append(m.Overlay[:0], dAtA[iNdEx:postIndex]...)
if m.Overlay == nil {
m.Overlay = []byte{}
m.Underlay = append(m.Underlay[:0], dAtA[iNdEx:postIndex]...)
if m.Underlay == nil {
m.Underlay = []byte{}
}
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Underlay", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field Signature", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowHive
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthHive
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthHive
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Signature = append(m.Signature[:0], dAtA[iNdEx:postIndex]...)
if m.Signature == nil {
m.Signature = []byte{}
}
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Overlay", wireType)
}
var stringLen uint64
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowHive
......@@ -426,23 +480,25 @@ func (m *BzzAddress) Unmarshal(dAtA []byte) error {
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
if byteLen < 0 {
return ErrInvalidLengthHive
}
postIndex := iNdEx + intStringLen
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthHive
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Underlay = string(dAtA[iNdEx:postIndex])
m.Overlay = append(m.Overlay[:0], dAtA[iNdEx:postIndex]...)
if m.Overlay == nil {
m.Overlay = []byte{}
}
iNdEx = postIndex
default:
iNdEx = preIndex
......
......@@ -13,6 +13,7 @@ message Peers {
}
message BzzAddress {
bytes Overlay = 1;
string Underlay = 2;
bytes Underlay = 1;
bytes Signature = 2;
bytes Overlay = 3;
}
......@@ -121,7 +121,7 @@ func (k *Kad) manage() {
return false, true, nil // bin is saturated, skip to next bin
}
ma, err := k.addressBook.Get(peer)
bzzAddr, err := k.addressBook.Get(peer)
if err != nil {
// either a peer is not known in the address book, in which case it
// should be removed, or that some severe I/O problem is at hand
......@@ -136,7 +136,7 @@ func (k *Kad) manage() {
}
k.logger.Debugf("kademlia dialing to peer %s", peer.String())
err = k.connect(ctx, peer, ma, po)
err = k.connect(ctx, peer, bzzAddr.Underlay, po)
if err != nil {
k.logger.Errorf("error connecting to peer %s: %v", peer, err)
// continue to next
......
This diff is collapsed.
......@@ -17,6 +17,7 @@ import (
"github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/bzz"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/debugapi"
"github.com/ethersphere/bee/pkg/hive"
......@@ -133,8 +134,9 @@ func NewBee(o Options) (*Bee, error) {
}
b.stateStoreCloser = stateStore
addressbook := addressbook.New(stateStore)
signer := crypto.NewDefaultSigner(swarmPrivateKey)
p2ps, err := libp2p.New(p2pCtx, crypto.NewDefaultSigner(swarmPrivateKey), o.NetworkID, address, o.Addr, libp2p.Options{
p2ps, err := libp2p.New(p2pCtx, signer, o.NetworkID, address, o.Addr, libp2p.Options{
PrivateKey: libp2pPrivateKey,
DisableWS: o.DisableWS,
DisableQUIC: o.DisableQUIC,
......@@ -161,6 +163,7 @@ func NewBee(o Options) (*Bee, error) {
hive := hive.New(hive.Options{
Streamer: p2ps,
AddressBook: addressbook,
NetworkID: o.NetworkID,
Logger: logger,
})
......@@ -350,11 +353,19 @@ func NewBee(o Options) (*Bee, error) {
return
}
err = addressbook.Put(overlay, addr)
bzzAddr, err := bzz.NewAddress(signer, addr, overlay, o.NetworkID)
if err != nil {
_ = p2ps.Disconnect(overlay)
logger.Debugf("new bzz address error %s %s: %v", a, overlay, err)
logger.Errorf("connect to bootnode %s", a)
return
}
err = addressbook.Put(overlay, *bzzAddr)
if err != nil {
_ = p2ps.Disconnect(overlay)
logger.Debugf("addressbook error persisting %s %s: %v", a, overlay, err)
logger.Errorf("persisting node %s", a)
logger.Errorf("connect to bootnode %s", a)
return
}
......
......@@ -6,12 +6,12 @@ package handshake
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"sync"
"time"
"github.com/ethersphere/bee/pkg/bzz"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
......@@ -21,6 +21,7 @@ import (
"github.com/libp2p/go-libp2p-core/network"
libp2ppeer "github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
)
const (
......@@ -37,8 +38,8 @@ var (
// ErrHandshakeDuplicate is returned if the handshake response has been received by an already processed peer.
ErrHandshakeDuplicate = errors.New("duplicate handshake")
// ErrInvalidSignature is returned if peer info was received with invalid signature
ErrInvalidSignature = errors.New("invalid signature")
// ErrInvalidBzzAddress is returned if peer info was received with invalid bzz address
ErrInvalidBzzAddress = errors.New("invalid bzz address")
// ErrInvalidAck is returned if ack does not match the syn provided
ErrInvalidAck = errors.New("invalid ack")
......@@ -50,39 +51,24 @@ type PeerFinder interface {
}
type Service struct {
overlay swarm.Address
underlay []byte
signature []byte
signer crypto.Signer
bzzAddress bzz.Address
networkID uint64
networkIDBytes []byte
receivedHandshakes map[libp2ppeer.ID]struct{}
receivedHandshakesMu sync.Mutex
logger logging.Logger
network.Notifiee // handhsake service can be the receiver for network.Notify
network.Notifiee // handshake service can be the receiver for network.Notify
}
func New(overlay swarm.Address, peerID libp2ppeer.ID, signer crypto.Signer, networkID uint64, logger logging.Logger) (*Service, error) {
underlay, err := peerID.MarshalBinary()
if err != nil {
return nil, err
}
networkIDBytes := make([]byte, 8)
binary.BigEndian.PutUint64(networkIDBytes, networkID)
signature, err := signer.Sign(append(underlay, networkIDBytes...))
func New(overlay swarm.Address, underlay ma.Multiaddr, signer crypto.Signer, networkID uint64, logger logging.Logger) (*Service, error) {
bzzAddress, err := bzz.NewAddress(signer, underlay, overlay, networkID)
if err != nil {
return nil, err
}
return &Service{
overlay: overlay,
underlay: underlay,
signature: signature,
signer: signer,
bzzAddress: *bzzAddress,
networkID: networkID,
networkIDBytes: networkIDBytes,
receivedHandshakes: make(map[libp2ppeer.ID]struct{}),
logger: logger,
Notifiee: new(network.NoopNotifiee),
......@@ -93,9 +79,9 @@ func (s *Service) Handshake(stream p2p.Stream) (i *Info, err error) {
w, r := protobuf.NewWriterAndReader(stream)
if err := w.WriteMsgWithTimeout(messageTimeout, &pb.Syn{
BzzAddress: &pb.BzzAddress{
Underlay: s.underlay,
Signature: s.signature,
Overlay: s.overlay.Bytes(),
Underlay: s.bzzAddress.Underlay.Bytes(),
Signature: s.bzzAddress.Signature,
Overlay: s.bzzAddress.Overlay.Bytes(),
},
NetworkID: s.networkID,
}); err != nil {
......@@ -111,8 +97,13 @@ func (s *Service) Handshake(stream p2p.Stream) (i *Info, err error) {
return nil, err
}
if err := s.checkSyn(resp.Syn); err != nil {
return nil, err
if resp.Syn.NetworkID != s.networkID {
return nil, ErrNetworkIDIncompatible
}
bzzAddress, err := bzz.ParseAddress(resp.Syn.BzzAddress.Underlay, resp.Syn.BzzAddress.Overlay, resp.Syn.BzzAddress.Signature, resp.Syn.NetworkID)
if err != nil {
return nil, ErrInvalidBzzAddress
}
if err := w.WriteMsgWithTimeout(messageTimeout, &pb.Ack{
......@@ -123,10 +114,8 @@ func (s *Service) Handshake(stream p2p.Stream) (i *Info, err error) {
s.logger.Tracef("handshake finished for peer %s", swarm.NewAddress(resp.Syn.BzzAddress.Overlay).String())
return &Info{
Overlay: swarm.NewAddress(resp.Syn.BzzAddress.Overlay),
Underlay: resp.Syn.BzzAddress.Underlay,
NetworkID: resp.Syn.NetworkID,
Light: resp.Syn.Light,
BzzAddress: bzzAddress,
Light: resp.Syn.Light,
}, nil
}
......@@ -146,16 +135,21 @@ func (s *Service) Handle(stream p2p.Stream, peerID libp2ppeer.ID) (i *Info, err
return nil, fmt.Errorf("read syn message: %w", err)
}
if err := s.checkSyn(&req); err != nil {
return nil, err
if req.NetworkID != s.networkID {
return nil, ErrNetworkIDIncompatible
}
bzzAddress, err := bzz.ParseAddress(req.BzzAddress.Underlay, req.BzzAddress.Overlay, req.BzzAddress.Signature, req.NetworkID)
if err != nil {
return nil, ErrInvalidBzzAddress
}
if err := w.WriteMsgWithTimeout(messageTimeout, &pb.SynAck{
Syn: &pb.Syn{
BzzAddress: &pb.BzzAddress{
Underlay: s.underlay,
Signature: s.signature,
Overlay: s.overlay.Bytes(),
Underlay: s.bzzAddress.Underlay.Bytes(),
Signature: s.bzzAddress.Signature,
Overlay: s.bzzAddress.Overlay.Bytes(),
},
NetworkID: s.networkID,
},
......@@ -173,12 +167,10 @@ func (s *Service) Handle(stream p2p.Stream, peerID libp2ppeer.ID) (i *Info, err
return nil, err
}
s.logger.Tracef("handshake finished for peer %s", req.BzzAddress.Overlay)
s.logger.Tracef("handshake finished for peer %s", swarm.NewAddress(req.BzzAddress.Overlay).String())
return &Info{
Overlay: swarm.NewAddress(req.BzzAddress.Overlay),
Underlay: req.BzzAddress.Underlay,
NetworkID: req.NetworkID,
Light: req.Light,
BzzAddress: bzzAddress,
Light: req.Light,
}, nil
}
......@@ -188,28 +180,10 @@ func (s *Service) Disconnected(_ network.Network, c network.Conn) {
delete(s.receivedHandshakes, c.RemotePeer())
}
func (s *Service) checkSyn(syn *pb.Syn) error {
if syn.NetworkID != s.networkID {
return ErrNetworkIDIncompatible
}
recoveredPK, err := crypto.Recover(syn.BzzAddress.Signature, append(syn.BzzAddress.Underlay, s.networkIDBytes...))
if err != nil {
return ErrInvalidSignature
}
recoveredOverlay := crypto.NewOverlayAddress(*recoveredPK, syn.NetworkID)
if !bytes.Equal(recoveredOverlay.Bytes(), syn.BzzAddress.Overlay) {
return ErrInvalidSignature
}
return nil
}
func (s *Service) checkAck(ack *pb.Ack) error {
if !bytes.Equal(ack.BzzAddress.Overlay, s.overlay.Bytes()) ||
!bytes.Equal(ack.BzzAddress.Underlay, s.underlay) ||
!bytes.Equal(ack.BzzAddress.Signature, s.signature) {
if !bytes.Equal(ack.BzzAddress.Overlay, s.bzzAddress.Overlay.Bytes()) ||
!bytes.Equal(ack.BzzAddress.Underlay, s.bzzAddress.Underlay.Bytes()) ||
!bytes.Equal(ack.BzzAddress.Signature, s.bzzAddress.Signature) {
return ErrInvalidAck
}
......@@ -217,8 +191,6 @@ func (s *Service) checkAck(ack *pb.Ack) error {
}
type Info struct {
Overlay swarm.Address
Underlay []byte
NetworkID uint64
Light bool
BzzAddress *bzz.Address
Light bool
}
......@@ -155,7 +155,13 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
return nil, fmt.Errorf("autonat: %w", err)
}
handshakeService, err := handshake.New(overlay, h.ID(), signer, networkID, o.Logger)
// todo: handle different underlays
underlay, err := buildUnderlayAddress(h.Addrs()[0], h.ID())
if err != nil {
return nil, fmt.Errorf("build host multiaddress: %w", err)
}
handshakeService, err := handshake.New(overlay, underlay, signer, networkID, o.Logger)
if err != nil {
return nil, fmt.Errorf("handshake service: %w", err)
}
......@@ -193,21 +199,14 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
return
}
if exists := s.peers.addIfNotExists(stream.Conn(), i.Overlay); exists {
if exists := s.peers.addIfNotExists(stream.Conn(), i.BzzAddress.Overlay); exists {
_ = stream.Close()
return
}
_ = stream.Close()
remoteMultiaddr, err := ma.NewMultiaddr(fmt.Sprintf("%s/p2p/%s", stream.Conn().RemoteMultiaddr().String(), peerID.Pretty()))
if err != nil {
s.logger.Debugf("multiaddr error: handle %s: %v", peerID, err)
s.logger.Errorf("unable to connect with peer %v", peerID)
_ = s.disconnect(peerID)
return
}
err = s.addressbook.Put(i.Overlay, remoteMultiaddr)
err = s.addressbook.Put(i.BzzAddress.Overlay, *i.BzzAddress)
if err != nil {
s.logger.Debugf("handshake: addressbook put error %s: %v", peerID, err)
s.logger.Errorf("unable to persist peer %v", peerID)
......@@ -216,13 +215,13 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
}
if s.topologyNotifier != nil {
if err := s.topologyNotifier.Connected(ctx, i.Overlay); err != nil {
s.logger.Debugf("peerhandler error: %s: %v", peerID, err)
if err := s.topologyNotifier.Connected(ctx, i.BzzAddress.Overlay); err != nil {
s.logger.Debugf("topology notifier: %s: %v", peerID, err)
}
}
s.metrics.HandledStreamCount.Inc()
s.logger.Infof("peer %s connected", i.Overlay)
s.logger.Infof("peer %s connected", i.BzzAddress.Overlay)
})
h.Network().SetConnHandler(func(_ network.Conn) {
......@@ -287,19 +286,27 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
return nil
}
func (s *Service) Addresses() (addrs []ma.Multiaddr, err error) {
func (s *Service) Addresses() (addreses []ma.Multiaddr, err error) {
for _, addr := range s.host.Addrs() {
a, err := buildUnderlayAddress(addr, s.host.ID())
if err != nil {
return nil, err
}
addreses = append(addreses, a)
}
return addreses, nil
}
func buildUnderlayAddress(addr ma.Multiaddr, peerID libp2ppeer.ID) (ma.Multiaddr, error) {
// Build host multiaddress
hostAddr, err := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", s.host.ID().Pretty()))
hostAddr, err := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", peerID.Pretty()))
if err != nil {
return nil, err
}
// Now we can build a full multiaddress to reach this host
// by encapsulating both addresses:
for _, addr := range s.host.Addrs() {
addrs = append(addrs, addr.Encapsulate(hostAddr))
}
return addrs, nil
return addr.Encapsulate(hostAddr), nil
}
func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (overlay swarm.Address, err error) {
......@@ -332,12 +339,12 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (overlay swarm
return swarm.Address{}, fmt.Errorf("handshake: %w", err)
}
if exists := s.peers.addIfNotExists(stream.Conn(), i.Overlay); exists {
if exists := s.peers.addIfNotExists(stream.Conn(), i.BzzAddress.Overlay); exists {
if err := helpers.FullClose(stream); err != nil {
return swarm.Address{}, err
}
return i.Overlay, nil
return i.BzzAddress.Overlay, nil
}
if err := helpers.FullClose(stream); err != nil {
......@@ -345,8 +352,8 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (overlay swarm
}
s.metrics.CreatedConnectionCount.Inc()
s.logger.Infof("peer %s connected", i.Overlay)
return i.Overlay, nil
s.logger.Infof("peer %s connected", i.BzzAddress.Overlay)
return i.BzzAddress.Overlay, nil
}
func (s *Service) Disconnect(overlay swarm.Address) error {
......
......@@ -68,9 +68,8 @@ func (d *driver) AddPeer(ctx context.Context, addr swarm.Address) error {
d.receivedPeers[addr.ByteString()] = struct{}{}
d.mtx.Unlock()
connectedPeers := d.p2pService.Peers()
ma, err := d.addressBook.Get(addr)
bzzAddress, err := d.addressBook.Get(addr)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return topology.ErrNotFound
......@@ -79,7 +78,7 @@ func (d *driver) AddPeer(ctx context.Context, addr swarm.Address) error {
}
if !isConnected(addr, connectedPeers) {
peerAddr, err := d.p2pService.Connect(ctx, ma)
_, err := d.p2pService.Connect(ctx, bzzAddress.Underlay)
if err != nil {
d.mtx.Lock()
delete(d.receivedPeers, addr.ByteString())
......@@ -91,15 +90,6 @@ func (d *driver) AddPeer(ctx context.Context, addr swarm.Address) error {
}
return err
}
// update addr if it is wrong or it has been changed
if !addr.Equal(peerAddr) {
addr = peerAddr
err := d.addressBook.Put(peerAddr, ma)
if err != nil {
return err
}
}
}
connectedAddrs := []swarm.Address{}
......@@ -192,7 +182,6 @@ func (d *driver) backoff(tryAfter time.Time) {
}
d.backoffActive = true
done := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
go func() {
......@@ -205,13 +194,11 @@ func (d *driver) backoff(tryAfter time.Time) {
go func() {
defer func() { close(done) }()
select {
case <-time.After(time.Until(tryAfter)):
d.mtx.Lock()
d.backoffActive = false
d.mtx.Unlock()
addresses, _ := d.addressBook.Overlays()
for _, addr := range addresses {
select {
......
......@@ -12,6 +12,8 @@ import (
"testing"
"github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/bzz"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/discovery/mock"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
......@@ -25,8 +27,21 @@ import (
func TestAddPeer(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
underlay := "/ip4/127.0.0.1/tcp/7070/p2p/16Uiu2HAkx8ULY8cTXhdVAcMmLcH9AsTKz6uBQ7DPLKRjMLgBVYkS"
underlay, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/7070/p2p/16Uiu2HAkx8ULY8cTXhdVAcMmLcH9AsTKz6uBQ7DPLKRjMLgBVYkS")
if err != nil {
t.Fatal(err)
}
overlay := swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59a")
pk, err := crypto.GenerateSecp256k1Key()
if err != nil {
t.Fatal(err)
}
bzzAddr, err := bzz.NewAddress(crypto.NewDefaultSigner(pk), underlay, overlay, 1)
if err != nil {
t.Fatal(err)
}
connectedPeers := []p2p.Peer{
{
Address: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59b"),
......@@ -43,23 +58,18 @@ func TestAddPeer(t *testing.T) {
discovery := mock.NewDiscovery()
statestore := mockstate.NewStateStore()
ab := addressbook.New(statestore)
p2p := p2pmock.New(p2pmock.WithConnectFunc(func(_ context.Context, addr ma.Multiaddr) (swarm.Address, error) {
if addr.String() != underlay {
t.Fatalf("expected multiaddr %s, got %s", addr.String(), underlay)
if !addr.Equal(underlay) {
t.Fatalf("expected multiaddr %s, got %s", addr, underlay)
}
return overlay, nil
}))
fullDriver := full.New(discovery, ab, p2p, logger, overlay)
defer fullDriver.Close()
multiaddr, err := ma.NewMultiaddr(underlay)
if err != nil {
t.Fatal(err)
}
err = ab.Put(overlay, multiaddr)
if err != nil {
if err := ab.Put(overlay, *bzzAddr); err != nil {
t.Fatal(err)
}
......@@ -99,6 +109,10 @@ func TestAddPeer(t *testing.T) {
statestore := mockstate.NewStateStore()
ab := addressbook.New(statestore)
alreadyConnected := connectedPeers[0].Address
addrAlreadyConnected, err := bzz.NewAddress(crypto.NewDefaultSigner(pk), underlay, alreadyConnected, 1)
if err != nil {
t.Fatal(err)
}
p2p := p2pmock.New(p2pmock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (swarm.Address, error) {
t.Fatal("should not be called")
......@@ -109,12 +123,8 @@ func TestAddPeer(t *testing.T) {
fullDriver := full.New(discovery, ab, p2p, logger, overlay)
defer fullDriver.Close()
multiaddr, err := ma.NewMultiaddr(underlay)
if err != nil {
t.Fatal("error creating multiaddr")
}
err = ab.Put(alreadyConnected, multiaddr)
err = ab.Put(alreadyConnected, *addrAlreadyConnected)
if err != nil {
t.Fatal(err)
}
......@@ -147,9 +157,10 @@ func TestAddPeer(t *testing.T) {
ab := addressbook.New(statestore)
p2ps := p2pmock.New(p2pmock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (swarm.Address, error) {
if addr.String() != underlay {
if !addr.Equal(underlay) {
t.Fatalf("expected multiaddr %s, got %s", addr.String(), underlay)
}
return overlay, nil
}), p2pmock.WithPeersFunc(func() []p2p.Peer {
return connectedPeers
......@@ -157,13 +168,7 @@ func TestAddPeer(t *testing.T) {
fullDriver := full.New(discovery, ab, p2ps, logger, overlay)
defer fullDriver.Close()
multiaddr, err := ma.NewMultiaddr(underlay)
if err != nil {
t.Fatal(err)
}
err = ab.Put(overlay, multiaddr)
if err != nil {
if err := ab.Put(overlay, *bzzAddr); err != nil {
t.Fatal(err)
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment