Commit 179fa422 authored by Petar Radovic's avatar Petar Radovic Committed by GitHub

Fix bzz addr in debugapi (#245)

* fix bzz addr
parent 4eeb6fee
...@@ -27,7 +27,7 @@ func (s *server) peerConnectHandler(w http.ResponseWriter, r *http.Request) { ...@@ -27,7 +27,7 @@ func (s *server) peerConnectHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
address, err := s.P2P.Connect(r.Context(), addr) bzzAddr, err := s.P2P.Connect(r.Context(), addr)
if err != nil { if err != nil {
s.Logger.Debugf("debug api: peer connect %s: %v", addr, err) s.Logger.Debugf("debug api: peer connect %s: %v", addr, err)
s.Logger.Errorf("unable to connect to peer %s", addr) s.Logger.Errorf("unable to connect to peer %s", addr)
...@@ -35,8 +35,23 @@ func (s *server) peerConnectHandler(w http.ResponseWriter, r *http.Request) { ...@@ -35,8 +35,23 @@ func (s *server) peerConnectHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
err = s.Addressbook.Put(bzzAddr.Overlay, *bzzAddr)
if err != nil {
s.Logger.Debugf("debug api: addressbook.put %s: %v", addr, err)
s.Logger.Errorf("unable to persist peer %s", addr)
jsonhttp.InternalServerError(w, err)
return
}
if err := s.TopologyDriver.Connected(r.Context(), bzzAddr.Overlay); err != nil {
_ = s.P2P.Disconnect(bzzAddr.Overlay)
s.Logger.Debugf("debug api: topologyDriver.Connected %s: %v", addr, err)
s.Logger.Errorf("unable to connect to peer %s", addr)
jsonhttp.InternalServerError(w, err)
return
}
jsonhttp.OK(w, peerConnectResponse{ jsonhttp.OK(w, peerConnectResponse{
Address: address.String(), Address: bzzAddr.Overlay.String(),
}) })
} }
......
...@@ -10,27 +10,46 @@ import ( ...@@ -10,27 +10,46 @@ import (
"net/http" "net/http"
"testing" "testing"
"github.com/ethersphere/bee/pkg/bzz"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/debugapi" "github.com/ethersphere/bee/pkg/debugapi"
"github.com/ethersphere/bee/pkg/jsonhttp" "github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest" "github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/mock" "github.com/ethersphere/bee/pkg/p2p/mock"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
topmock "github.com/ethersphere/bee/pkg/topology/mock"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
) )
func TestConnect(t *testing.T) { func TestConnect(t *testing.T) {
underlay := "/ip4/127.0.0.1/tcp/7070/p2p/16Uiu2HAkx8ULY8cTXhdVAcMmLcH9AsTKz6uBQ7DPLKRjMLgBVYkS" underlay := "/ip4/127.0.0.1/tcp/7070/p2p/16Uiu2HAkx8ULY8cTXhdVAcMmLcH9AsTKz6uBQ7DPLKRjMLgBVYkS"
errorUnderlay := "/ip4/127.0.0.1/tcp/7070/p2p/16Uiu2HAkw88cjH2orYrB6fDui4eUNdmgkwnDM8W681UbfsPgM9QY" errorUnderlay := "/ip4/127.0.0.1/tcp/7070/p2p/16Uiu2HAkw88cjH2orYrB6fDui4eUNdmgkwnDM8W681UbfsPgM9QY"
overlay := swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c")
testErr := errors.New("test error") testErr := errors.New("test error")
privateKey, err := crypto.GenerateSecp256k1Key()
if err != nil {
t.Fatal(err)
}
overlay := crypto.NewOverlayAddress(privateKey.PublicKey, 0)
underlama, err := ma.NewMultiaddr(underlay)
if err != nil {
t.Fatal(err)
}
bzzAddress, err := bzz.NewAddress(crypto.NewDefaultSigner(privateKey), underlama, overlay, 0)
if err != nil {
t.Fatal(err)
}
testServer := newTestServer(t, testServerOptions{ testServer := newTestServer(t, testServerOptions{
P2P: mock.New(mock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (swarm.Address, error) { P2P: mock.New(mock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (*bzz.Address, error) {
if addr.String() == errorUnderlay { if addr.String() == errorUnderlay {
return swarm.Address{}, testErr return nil, testErr
} }
return overlay, nil return bzzAddress, nil
})), })),
}) })
...@@ -38,6 +57,11 @@ func TestConnect(t *testing.T) { ...@@ -38,6 +57,11 @@ func TestConnect(t *testing.T) {
jsonhttptest.ResponseDirect(t, testServer.Client, http.MethodPost, "/connect"+underlay, nil, http.StatusOK, debugapi.PeerConnectResponse{ jsonhttptest.ResponseDirect(t, testServer.Client, http.MethodPost, "/connect"+underlay, nil, http.StatusOK, debugapi.PeerConnectResponse{
Address: overlay.String(), Address: overlay.String(),
}) })
bzzAddr, err := testServer.Addressbook.Get(overlay)
if err != nil && errors.Is(err, storage.ErrNotFound) && !bzzAddress.Equal(&bzzAddr) {
t.Fatalf("found wrong underlay. expected: %+v, found: %+v", bzzAddress, bzzAddr)
}
}) })
t.Run("error", func(t *testing.T) { t.Run("error", func(t *testing.T) {
...@@ -53,6 +77,37 @@ func TestConnect(t *testing.T) { ...@@ -53,6 +77,37 @@ func TestConnect(t *testing.T) {
Message: http.StatusText(http.StatusMethodNotAllowed), Message: http.StatusText(http.StatusMethodNotAllowed),
}) })
}) })
t.Run("error - add peer", func(t *testing.T) {
disconnectCalled := false
testServer := newTestServer(t, testServerOptions{
P2P: mock.New(mock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (*bzz.Address, error) {
if addr.String() == errorUnderlay {
return nil, testErr
}
return bzzAddress, nil
}), mock.WithDisconnectFunc(func(addr swarm.Address) error {
disconnectCalled = true
return nil
})),
TopologyOpts: []topmock.Option{topmock.WithAddPeerErr(testErr)},
})
jsonhttptest.ResponseDirect(t, testServer.Client, http.MethodPost, "/connect"+underlay, nil, http.StatusInternalServerError, jsonhttp.StatusResponse{
Code: http.StatusInternalServerError,
Message: testErr.Error(),
})
bzzAddr, err := testServer.Addressbook.Get(overlay)
if err != nil && errors.Is(err, storage.ErrNotFound) && !bzzAddress.Equal(&bzzAddr) {
t.Fatalf("found wrong underlay. expected: %+v, found: %+v", bzzAddress, bzzAddr)
}
if !disconnectCalled {
t.Fatalf("disconnect not called.")
}
})
} }
func TestDisconnect(t *testing.T) { func TestDisconnect(t *testing.T) {
......
...@@ -138,7 +138,8 @@ func (k *Kad) manage() { ...@@ -138,7 +138,8 @@ func (k *Kad) manage() {
err = k.connect(ctx, peer, bzzAddr.Underlay, po) err = k.connect(ctx, peer, bzzAddr.Underlay, po)
if err != nil { if err != nil {
k.logger.Errorf("error connecting to peer %s: %v", peer, err) k.logger.Debugf("error connecting to peer from kademlia %s %+v: %v", peer, bzzAddr, err)
k.logger.Errorf("connecting to peer %s: %v", peer, err)
// continue to next // continue to next
return false, false, nil return false, false, nil
} }
......
...@@ -444,11 +444,11 @@ func newTestKademlia(connCounter *int32, f func(bin, depth uint8, peers *pslice. ...@@ -444,11 +444,11 @@ func newTestKademlia(connCounter *int32, f func(bin, depth uint8, peers *pslice.
} }
func p2pMock(counter *int32) p2p.Service { func p2pMock(counter *int32) p2p.Service {
p2ps := p2pmock.New(p2pmock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (swarm.Address, error) { p2ps := p2pmock.New(p2pmock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (*bzz.Address, error) {
if counter != nil { if counter != nil {
_ = atomic.AddInt32(counter, 1) _ = atomic.AddInt32(counter, 1)
} }
return swarm.ZeroAddress, nil return nil, nil
})) }))
return p2ps return p2ps
......
...@@ -17,7 +17,6 @@ import ( ...@@ -17,7 +17,6 @@ import (
"github.com/ethersphere/bee/pkg/addressbook" "github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/api" "github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/bzz"
"github.com/ethersphere/bee/pkg/crypto" "github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/debugapi" "github.com/ethersphere/bee/pkg/debugapi"
"github.com/ethersphere/bee/pkg/hive" "github.com/ethersphere/bee/pkg/hive"
...@@ -346,32 +345,24 @@ func NewBee(o Options) (*Bee, error) { ...@@ -346,32 +345,24 @@ func NewBee(o Options) (*Bee, error) {
return return
} }
overlay, err := p2ps.Connect(p2pCtx, addr) bzzAddr, err := p2ps.Connect(p2pCtx, addr)
if err != nil { if err != nil {
logger.Debugf("connect fail %s: %v", a, err) logger.Debugf("connect fail %s: %v", a, err)
logger.Errorf("connect to bootnode %s", a) logger.Errorf("connect to bootnode %s", a)
return return
} }
bzzAddr, err := bzz.NewAddress(signer, addr, overlay, o.NetworkID) err = addressbook.Put(bzzAddr.Overlay, *bzzAddr)
if err != nil { if err != nil {
_ = p2ps.Disconnect(overlay) _ = p2ps.Disconnect(bzzAddr.Overlay)
logger.Debugf("new bzz address error %s %s: %v", a, overlay, err) logger.Debugf("addressbook error persisting %s %s: %v", a, bzzAddr.Overlay, err)
logger.Errorf("connect to bootnode %s", a) logger.Errorf("connect to bootnode %s", a)
return return
} }
err = addressbook.Put(overlay, *bzzAddr) if err := topologyDriver.Connected(p2pCtx, bzzAddr.Overlay); err != nil {
if err != nil { _ = p2ps.Disconnect(bzzAddr.Overlay)
_ = p2ps.Disconnect(overlay) logger.Debugf("topology connected fail %s %s: %v", a, bzzAddr.Overlay, err)
logger.Debugf("addressbook error persisting %s %s: %v", a, overlay, err)
logger.Errorf("connect to bootnode %s", a)
return
}
if err := topologyDriver.AddPeer(p2pCtx, overlay); err != nil {
_ = p2ps.Disconnect(overlay)
logger.Debugf("topology add peer fail %s %s: %v", a, overlay, err)
logger.Errorf("connect to bootnode %s", a) logger.Errorf("connect to bootnode %s", a)
return return
} }
......
...@@ -41,7 +41,7 @@ func TestConnectDisconnect(t *testing.T) { ...@@ -41,7 +41,7 @@ func TestConnectDisconnect(t *testing.T) {
addr := serviceUnderlayAddress(t, s1) addr := serviceUnderlayAddress(t, s1)
overlay, err := s2.Connect(ctx, addr) bzzAddr, err := s2.Connect(ctx, addr)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -49,7 +49,7 @@ func TestConnectDisconnect(t *testing.T) { ...@@ -49,7 +49,7 @@ func TestConnectDisconnect(t *testing.T) {
expectPeers(t, s2, overlay1) expectPeers(t, s2, overlay1)
expectPeersEventually(t, s1, overlay2) expectPeersEventually(t, s1, overlay2)
if err := s2.Disconnect(overlay); err != nil { if err := s2.Disconnect(bzzAddr.Overlay); err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -92,7 +92,7 @@ func TestDoubleDisconnect(t *testing.T) { ...@@ -92,7 +92,7 @@ func TestDoubleDisconnect(t *testing.T) {
addr := serviceUnderlayAddress(t, s1) addr := serviceUnderlayAddress(t, s1)
overlay, err := s2.Connect(ctx, addr) bzzAddr, err := s2.Connect(ctx, addr)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -100,14 +100,14 @@ func TestDoubleDisconnect(t *testing.T) { ...@@ -100,14 +100,14 @@ func TestDoubleDisconnect(t *testing.T) {
expectPeers(t, s2, overlay1) expectPeers(t, s2, overlay1)
expectPeersEventually(t, s1, overlay2) expectPeersEventually(t, s1, overlay2)
if err := s2.Disconnect(overlay); err != nil { if err := s2.Disconnect(bzzAddr.Overlay); err != nil {
t.Fatal(err) t.Fatal(err)
} }
expectPeers(t, s2) expectPeers(t, s2)
expectPeersEventually(t, s1) expectPeersEventually(t, s1)
if err := s2.Disconnect(overlay); !errors.Is(err, p2p.ErrPeerNotFound) { if err := s2.Disconnect(bzzAddr.Overlay); !errors.Is(err, p2p.ErrPeerNotFound) {
t.Errorf("got error %v, want %v", err, p2p.ErrPeerNotFound) t.Errorf("got error %v, want %v", err, p2p.ErrPeerNotFound)
} }
...@@ -125,7 +125,7 @@ func TestMultipleConnectDisconnect(t *testing.T) { ...@@ -125,7 +125,7 @@ func TestMultipleConnectDisconnect(t *testing.T) {
addr := serviceUnderlayAddress(t, s1) addr := serviceUnderlayAddress(t, s1)
overlay, err := s2.Connect(ctx, addr) bzzAddr, err := s2.Connect(ctx, addr)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -133,14 +133,14 @@ func TestMultipleConnectDisconnect(t *testing.T) { ...@@ -133,14 +133,14 @@ func TestMultipleConnectDisconnect(t *testing.T) {
expectPeers(t, s2, overlay1) expectPeers(t, s2, overlay1)
expectPeersEventually(t, s1, overlay2) expectPeersEventually(t, s1, overlay2)
if err := s2.Disconnect(overlay); err != nil { if err := s2.Disconnect(bzzAddr.Overlay); err != nil {
t.Fatal(err) t.Fatal(err)
} }
expectPeers(t, s2) expectPeers(t, s2)
expectPeersEventually(t, s1) expectPeersEventually(t, s1)
overlay, err = s2.Connect(ctx, addr) bzzAddr, err = s2.Connect(ctx, addr)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -148,7 +148,7 @@ func TestMultipleConnectDisconnect(t *testing.T) { ...@@ -148,7 +148,7 @@ func TestMultipleConnectDisconnect(t *testing.T) {
expectPeers(t, s2, overlay1) expectPeers(t, s2, overlay1)
expectPeersEventually(t, s1, overlay2) expectPeersEventually(t, s1, overlay2)
if err := s2.Disconnect(overlay); err != nil { if err := s2.Disconnect(bzzAddr.Overlay); err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -169,7 +169,7 @@ func TestConnectDisconnectOnAllAddresses(t *testing.T) { ...@@ -169,7 +169,7 @@ func TestConnectDisconnectOnAllAddresses(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
for _, addr := range addrs { for _, addr := range addrs {
overlay, err := s2.Connect(ctx, addr) bzzAddr, err := s2.Connect(ctx, addr)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -177,7 +177,7 @@ func TestConnectDisconnectOnAllAddresses(t *testing.T) { ...@@ -177,7 +177,7 @@ func TestConnectDisconnectOnAllAddresses(t *testing.T) {
expectPeers(t, s2, overlay1) expectPeers(t, s2, overlay1)
expectPeersEventually(t, s1, overlay2) expectPeersEventually(t, s1, overlay2)
if err := s2.Disconnect(overlay); err != nil { if err := s2.Disconnect(bzzAddr.Overlay); err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -349,7 +349,7 @@ func TestTopologyNotifiee(t *testing.T) { ...@@ -349,7 +349,7 @@ func TestTopologyNotifiee(t *testing.T) {
addr := serviceUnderlayAddress(t, s1) addr := serviceUnderlayAddress(t, s1)
// s2 connects to s1, thus the notifiee on s1 should be called on Connect // s2 connects to s1, thus the notifiee on s1 should be called on Connect
overlay, err := s2.Connect(ctx, addr) bzzAddr, err := s2.Connect(ctx, addr)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -365,7 +365,7 @@ func TestTopologyNotifiee(t *testing.T) { ...@@ -365,7 +365,7 @@ func TestTopologyNotifiee(t *testing.T) {
mtx.Unlock() mtx.Unlock()
// s2 disconnects from s1 so s1 disconnect notifiee should be called // s2 disconnects from s1 so s1 disconnect notifiee should be called
if err := s2.Disconnect(overlay); err != nil { if err := s2.Disconnect(bzzAddr.Overlay); err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -384,7 +384,7 @@ func TestTopologyNotifiee(t *testing.T) { ...@@ -384,7 +384,7 @@ func TestTopologyNotifiee(t *testing.T) {
addr2 := serviceUnderlayAddress(t, s2) addr2 := serviceUnderlayAddress(t, s2)
// s1 connects to s2, thus the notifiee on s2 should be called on Connect // s1 connects to s2, thus the notifiee on s2 should be called on Connect
o2, err := s1.Connect(ctx, addr2) bzzAddr2, err := s1.Connect(ctx, addr2)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -394,7 +394,7 @@ func TestTopologyNotifiee(t *testing.T) { ...@@ -394,7 +394,7 @@ func TestTopologyNotifiee(t *testing.T) {
waitAddrSet(t, &n2connectedAddr, &mtx, overlay1) waitAddrSet(t, &n2connectedAddr, &mtx, overlay1)
// s1 disconnects from s2 so s2 disconnect notifiee should be called // s1 disconnects from s2 so s2 disconnect notifiee should be called
if err := s1.Disconnect(o2); err != nil { if err := s1.Disconnect(bzzAddr2.Overlay); err != nil {
t.Fatal(err) t.Fatal(err)
} }
expectPeers(t, s1) expectPeers(t, s1)
......
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
"net" "net"
"github.com/ethersphere/bee/pkg/addressbook" "github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/bzz"
beecrypto "github.com/ethersphere/bee/pkg/crypto" beecrypto "github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
...@@ -156,7 +157,7 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay ...@@ -156,7 +157,7 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
} }
// todo: handle different underlays // todo: handle different underlays
underlay, err := buildUnderlayAddress(h.Addrs()[0], h.ID()) underlay, err := buildUnderlayAddress(h.Addrs()[1], h.ID())
if err != nil { if err != nil {
return nil, fmt.Errorf("build host multiaddress: %w", err) return nil, fmt.Errorf("build host multiaddress: %w", err)
} }
...@@ -309,51 +310,51 @@ func buildUnderlayAddress(addr ma.Multiaddr, peerID libp2ppeer.ID) (ma.Multiaddr ...@@ -309,51 +310,51 @@ func buildUnderlayAddress(addr ma.Multiaddr, peerID libp2ppeer.ID) (ma.Multiaddr
return addr.Encapsulate(hostAddr), nil return addr.Encapsulate(hostAddr), nil
} }
func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (overlay swarm.Address, err error) { func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.Address, err error) {
// Extract the peer ID from the multiaddr. // Extract the peer ID from the multiaddr.
info, err := libp2ppeer.AddrInfoFromP2pAddr(addr) info, err := libp2ppeer.AddrInfoFromP2pAddr(addr)
if err != nil { if err != nil {
return swarm.Address{}, err return nil, err
} }
if _, found := s.peers.overlay(info.ID); found { if _, found := s.peers.overlay(info.ID); found {
return swarm.Address{}, p2p.ErrAlreadyConnected return nil, p2p.ErrAlreadyConnected
} }
if err := s.conectionBreaker.Execute(func() error { return s.host.Connect(ctx, *info) }); err != nil { if err := s.conectionBreaker.Execute(func() error { return s.host.Connect(ctx, *info) }); err != nil {
if errors.Is(err, breaker.ErrClosed) { if errors.Is(err, breaker.ErrClosed) {
return swarm.Address{}, p2p.NewConnectionBackoffError(err, s.conectionBreaker.ClosedUntil()) return nil, p2p.NewConnectionBackoffError(err, s.conectionBreaker.ClosedUntil())
} }
return swarm.Address{}, err return nil, err
} }
stream, err := s.newStreamForPeerID(ctx, info.ID, handshake.ProtocolName, handshake.ProtocolVersion, handshake.StreamName) stream, err := s.newStreamForPeerID(ctx, info.ID, handshake.ProtocolName, handshake.ProtocolVersion, handshake.StreamName)
if err != nil { if err != nil {
_ = s.disconnect(info.ID) _ = s.disconnect(info.ID)
return swarm.Address{}, err return nil, err
} }
i, err := s.handshakeService.Handshake(NewStream(stream)) i, err := s.handshakeService.Handshake(NewStream(stream))
if err != nil { if err != nil {
_ = s.disconnect(info.ID) _ = s.disconnect(info.ID)
return swarm.Address{}, fmt.Errorf("handshake: %w", err) return nil, fmt.Errorf("handshake: %w", err)
} }
if exists := s.peers.addIfNotExists(stream.Conn(), i.BzzAddress.Overlay); exists { if exists := s.peers.addIfNotExists(stream.Conn(), i.BzzAddress.Overlay); exists {
if err := helpers.FullClose(stream); err != nil { if err := helpers.FullClose(stream); err != nil {
return swarm.Address{}, err return nil, err
} }
return i.BzzAddress.Overlay, nil return i.BzzAddress, nil
} }
if err := helpers.FullClose(stream); err != nil { if err := helpers.FullClose(stream); err != nil {
return swarm.Address{}, err return nil, err
} }
s.metrics.CreatedConnectionCount.Inc() s.metrics.CreatedConnectionCount.Inc()
s.logger.Infof("peer %s connected", i.BzzAddress.Overlay) s.logger.Infof("peer %s connected", i.BzzAddress.Overlay)
return i.BzzAddress.Overlay, nil return i.BzzAddress, nil
} }
func (s *Service) Disconnect(overlay swarm.Address) error { func (s *Service) Disconnect(overlay swarm.Address) error {
......
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"context" "context"
"errors" "errors"
"github.com/ethersphere/bee/pkg/bzz"
"github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology" "github.com/ethersphere/bee/pkg/topology"
...@@ -16,7 +17,7 @@ import ( ...@@ -16,7 +17,7 @@ import (
type Service struct { type Service struct {
addProtocolFunc func(p2p.ProtocolSpec) error addProtocolFunc func(p2p.ProtocolSpec) error
connectFunc func(ctx context.Context, addr ma.Multiaddr) (overlay swarm.Address, err error) connectFunc func(ctx context.Context, addr ma.Multiaddr) (address *bzz.Address, err error)
disconnectFunc func(overlay swarm.Address) error disconnectFunc func(overlay swarm.Address) error
peersFunc func() []p2p.Peer peersFunc func() []p2p.Peer
setNotifierFunc func(topology.Notifier) setNotifierFunc func(topology.Notifier)
...@@ -29,7 +30,7 @@ func WithAddProtocolFunc(f func(p2p.ProtocolSpec) error) Option { ...@@ -29,7 +30,7 @@ func WithAddProtocolFunc(f func(p2p.ProtocolSpec) error) Option {
}) })
} }
func WithConnectFunc(f func(ctx context.Context, addr ma.Multiaddr) (overlay swarm.Address, err error)) Option { func WithConnectFunc(f func(ctx context.Context, addr ma.Multiaddr) (address *bzz.Address, err error)) Option {
return optionFunc(func(s *Service) { return optionFunc(func(s *Service) {
s.connectFunc = f s.connectFunc = f
}) })
...@@ -74,9 +75,9 @@ func (s *Service) AddProtocol(spec p2p.ProtocolSpec) error { ...@@ -74,9 +75,9 @@ func (s *Service) AddProtocol(spec p2p.ProtocolSpec) error {
return s.addProtocolFunc(spec) return s.addProtocolFunc(spec)
} }
func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (overlay swarm.Address, err error) { func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.Address, err error) {
if s.connectFunc == nil { if s.connectFunc == nil {
return swarm.Address{}, errors.New("function Connect not configured") return nil, errors.New("function Connect not configured")
} }
return s.connectFunc(ctx, addr) return s.connectFunc(ctx, addr)
} }
......
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"context" "context"
"io" "io"
"github.com/ethersphere/bee/pkg/bzz"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology" "github.com/ethersphere/bee/pkg/topology"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
...@@ -16,7 +17,7 @@ import ( ...@@ -16,7 +17,7 @@ import (
// Service provides methods to handle p2p Peers and Protocols. // Service provides methods to handle p2p Peers and Protocols.
type Service interface { type Service interface {
AddProtocol(ProtocolSpec) error AddProtocol(ProtocolSpec) error
Connect(ctx context.Context, addr ma.Multiaddr) (overlay swarm.Address, err error) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.Address, err error)
Disconnect(overlay swarm.Address) error Disconnect(overlay swarm.Address) error
Peers() []Peer Peers() []Peer
SetNotifier(topology.Notifier) SetNotifier(topology.Notifier)
......
...@@ -58,12 +58,12 @@ func TestAddPeer(t *testing.T) { ...@@ -58,12 +58,12 @@ func TestAddPeer(t *testing.T) {
discovery := mock.NewDiscovery() discovery := mock.NewDiscovery()
statestore := mockstate.NewStateStore() statestore := mockstate.NewStateStore()
ab := addressbook.New(statestore) ab := addressbook.New(statestore)
p2p := p2pmock.New(p2pmock.WithConnectFunc(func(_ context.Context, addr ma.Multiaddr) (swarm.Address, error) { p2p := p2pmock.New(p2pmock.WithConnectFunc(func(_ context.Context, addr ma.Multiaddr) (*bzz.Address, error) {
if !addr.Equal(underlay) { if !addr.Equal(underlay) {
t.Fatalf("expected multiaddr %s, got %s", addr, underlay) t.Fatalf("expected multiaddr %s, got %s", addr, underlay)
} }
return overlay, nil return bzzAddr, nil
})) }))
fullDriver := full.New(discovery, ab, p2p, logger, overlay) fullDriver := full.New(discovery, ab, p2p, logger, overlay)
...@@ -87,9 +87,9 @@ func TestAddPeer(t *testing.T) { ...@@ -87,9 +87,9 @@ func TestAddPeer(t *testing.T) {
discovery := mock.NewDiscovery() discovery := mock.NewDiscovery()
statestore := mockstate.NewStateStore() statestore := mockstate.NewStateStore()
ab := addressbook.New(statestore) ab := addressbook.New(statestore)
p2p := p2pmock.New(p2pmock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (swarm.Address, error) { p2p := p2pmock.New(p2pmock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (*bzz.Address, error) {
t.Fatal("should not be called") t.Fatal("should not be called")
return swarm.Address{}, nil return nil, nil
})) }))
fullDriver := full.New(discovery, ab, p2p, logger, overlay) fullDriver := full.New(discovery, ab, p2p, logger, overlay)
...@@ -114,9 +114,9 @@ func TestAddPeer(t *testing.T) { ...@@ -114,9 +114,9 @@ func TestAddPeer(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
p2p := p2pmock.New(p2pmock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (swarm.Address, error) { p2p := p2pmock.New(p2pmock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (*bzz.Address, error) {
t.Fatal("should not be called") t.Fatal("should not be called")
return swarm.Address{}, nil return nil, nil
}), p2pmock.WithPeersFunc(func() []p2p.Peer { }), p2pmock.WithPeersFunc(func() []p2p.Peer {
return connectedPeers return connectedPeers
})) }))
...@@ -156,12 +156,12 @@ func TestAddPeer(t *testing.T) { ...@@ -156,12 +156,12 @@ func TestAddPeer(t *testing.T) {
statestore := mockstate.NewStateStore() statestore := mockstate.NewStateStore()
ab := addressbook.New(statestore) ab := addressbook.New(statestore)
p2ps := p2pmock.New(p2pmock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (swarm.Address, error) { p2ps := p2pmock.New(p2pmock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (*bzz.Address, error) {
if !addr.Equal(underlay) { if !addr.Equal(underlay) {
t.Fatalf("expected multiaddr %s, got %s", addr.String(), underlay) t.Fatalf("expected multiaddr %s, got %s", addr.String(), underlay)
} }
return overlay, nil return bzzAddr, nil
}), p2pmock.WithPeersFunc(func() []p2p.Peer { }), p2pmock.WithPeersFunc(func() []p2p.Peer {
return connectedPeers return connectedPeers
})) }))
...@@ -199,6 +199,10 @@ func TestAddPeer(t *testing.T) { ...@@ -199,6 +199,10 @@ func TestAddPeer(t *testing.T) {
func TestClosestPeer(t *testing.T) { func TestClosestPeer(t *testing.T) {
logger := logging.New(ioutil.Discard, 0) logger := logging.New(ioutil.Discard, 0)
baseOverlay := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") // base is 0000 baseOverlay := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") // base is 0000
bzzAddr := &bzz.Address{
Overlay: baseOverlay,
}
connectedPeers := []p2p.Peer{ connectedPeers := []p2p.Peer{
{ {
Address: swarm.MustParseHexAddress("8000000000000000000000000000000000000000000000000000000000000000"), // binary 1000 -> po 0 to base Address: swarm.MustParseHexAddress("8000000000000000000000000000000000000000000000000000000000000000"), // binary 1000 -> po 0 to base
...@@ -215,8 +219,8 @@ func TestClosestPeer(t *testing.T) { ...@@ -215,8 +219,8 @@ func TestClosestPeer(t *testing.T) {
statestore := mockstate.NewStateStore() statestore := mockstate.NewStateStore()
ab := addressbook.New(statestore) ab := addressbook.New(statestore)
p2ps := p2pmock.New(p2pmock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (swarm.Address, error) { p2ps := p2pmock.New(p2pmock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (*bzz.Address, error) {
return baseOverlay, nil return bzzAddr, nil
}), p2pmock.WithPeersFunc(func() []p2p.Peer { }), p2pmock.WithPeersFunc(func() []p2p.Peer {
return connectedPeers return connectedPeers
})) }))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment