Commit 73654962 authored by Anatolie Lupacescu's avatar Anatolie Lupacescu Committed by GitHub

fix: add Ethereum address field (#2126)

parent eb445d9c
......@@ -28,10 +28,11 @@ var ErrInvalidAddress = errors.New("invalid address")
// It consists of a peers underlay (physical) address, overlay (topology) address and signature.
// Signature is used to verify the `Overlay/Underlay` pair, as it is based on `underlay|networkID`, signed with the public key of Overlay address
type Address struct {
Underlay ma.Multiaddr
Overlay swarm.Address
Signature []byte
Transaction []byte
Underlay ma.Multiaddr
Overlay swarm.Address
Signature []byte
Transaction []byte
EthereumAddress []byte
}
type addressJSON struct {
......@@ -79,11 +80,17 @@ func ParseAddress(underlay, overlay, signature, trxHash, blockHash []byte, netwo
return nil, ErrInvalidAddress
}
ethAddress, err := crypto.NewEthereumAddress(*recoveredPK)
if err != nil {
return nil, fmt.Errorf("extract ethereum address: %v: %w", err, ErrInvalidAddress)
}
return &Address{
Underlay: multiUnderlay,
Overlay: swarm.NewAddress(overlay),
Signature: signature,
Transaction: trxHash,
Underlay: multiUnderlay,
Overlay: swarm.NewAddress(overlay),
Signature: signature,
Transaction: trxHash,
EthereumAddress: ethAddress,
}, nil
}
......
......@@ -63,13 +63,19 @@ func (s *Service) peerDisconnectHandler(w http.ResponseWriter, r *http.Request)
jsonhttp.OK(w, nil)
}
// Peer holds information about a Peer.
type Peer struct {
Address swarm.Address `json:"address"`
FullNode bool `json:"fullNode"`
}
type peersResponse struct {
Peers []p2p.Peer `json:"peers"`
Peers []Peer `json:"peers"`
}
func (s *Service) peersHandler(w http.ResponseWriter, r *http.Request) {
jsonhttp.OK(w, peersResponse{
Peers: s.p2p.Peers(),
Peers: mapPeers(s.p2p.Peers()),
})
}
......@@ -82,6 +88,16 @@ func (s *Service) blocklistedPeersHandler(w http.ResponseWriter, r *http.Request
}
jsonhttp.OK(w, peersResponse{
Peers: peers,
Peers: mapPeers(peers),
})
}
func mapPeers(peers []p2p.Peer) (out []Peer) {
for _, peer := range peers {
out = append(out, Peer{
Address: peer.Address,
FullNode: peer.FullNode,
})
}
return
}
......@@ -170,7 +170,7 @@ func TestPeer(t *testing.T) {
t.Run("ok", func(t *testing.T) {
jsonhttptest.Request(t, testServer.Client, http.MethodGet, "/peers", http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(debugapi.PeersResponse{
Peers: []p2p.Peer{{Address: overlay}},
Peers: []debugapi.Peer{{Address: overlay}},
}),
)
})
......@@ -195,7 +195,7 @@ func TestBlocklistedPeers(t *testing.T) {
jsonhttptest.Request(t, testServer.Client, http.MethodGet, "/blocklist", http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(debugapi.PeersResponse{
Peers: []p2p.Peer{{Address: overlay}},
Peers: []debugapi.Peer{{Address: overlay}},
}),
)
}
......
......@@ -345,7 +345,7 @@ func (s *Service) handleIncoming(stream network.Stream) {
}
}
peer := p2p.Peer{Address: overlay, FullNode: i.FullNode}
peer := p2p.Peer{Address: overlay, FullNode: i.FullNode, EthereumAddress: i.BzzAddress.EthereumAddress}
s.protocolsmu.RLock()
for _, tn := range s.protocols {
......@@ -639,7 +639,7 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.
s.protocolsmu.RLock()
for _, tn := range s.protocols {
if tn.ConnectOut != nil {
if err := tn.ConnectOut(ctx, p2p.Peer{Address: overlay, FullNode: i.FullNode}); err != nil {
if err := tn.ConnectOut(ctx, p2p.Peer{Address: overlay, FullNode: i.FullNode, EthereumAddress: i.BzzAddress.EthereumAddress}); err != nil {
s.logger.Debugf("connectOut: protocol: %s, version:%s, peer: %s: %v", tn.Name, tn.Version, overlay, err)
_ = s.Disconnect(overlay)
s.protocolsmu.RUnlock()
......
......@@ -100,8 +100,9 @@ type StreamSpec struct {
// Peer holds information about a Peer.
type Peer struct {
Address swarm.Address `json:"address"`
FullNode bool `json:"fullNode"`
Address swarm.Address
FullNode bool
EthereumAddress []byte
}
// HandlerFunc handles a received Stream from a Peer.
......
......@@ -5,6 +5,7 @@
package swap
import (
"errors"
"fmt"
"github.com/ethereum/go-ethereum/common"
......@@ -43,6 +44,8 @@ type Addressbook interface {
GetDeductionFor(peer swarm.Address) (bool, error)
// GetDeductionBy returns whether a peer have already received a cheque that has been deducted
GetDeductionBy(peer swarm.Address) (bool, error)
// MigratePeer returns whether a peer have already received a cheque that has been deducted
MigratePeer(oldPeer, newPeer swarm.Address) error
}
type addressbook struct {
......@@ -56,6 +59,40 @@ func NewAddressbook(store storage.StateStorer) Addressbook {
}
}
func (a *addressbook) MigratePeer(oldPeer, newPeer swarm.Address) error {
ba, known, err := a.Beneficiary(oldPeer)
if err != nil {
return err
}
if !known {
return errors.New("old beneficiary not known")
}
cb, known, err := a.Chequebook(oldPeer)
if err != nil {
return err
}
if err := a.PutBeneficiary(newPeer, ba); err != nil {
return err
}
if err := a.store.Delete(peerBeneficiaryKey(oldPeer)); err != nil {
return err
}
if known {
if err := a.PutChequebook(newPeer, cb); err != nil {
return err
}
if err := a.store.Delete(peerKey(oldPeer)); err != nil {
return err
}
}
return nil
}
// Beneficiary returns the beneficiary for the given peer.
func (a *addressbook) Beneficiary(peer swarm.Address) (beneficiary common.Address, known bool, err error) {
err = a.store.Get(peerBeneficiaryKey(peer), &beneficiary)
......
......@@ -22,8 +22,6 @@ import (
var (
// ErrWrongChequebook is the error if a peer uses a different chequebook from before.
ErrWrongChequebook = errors.New("wrong chequebook")
// ErrWrongBeneficiary is the error if a peer uses a different beneficiary than expected.
ErrWrongBeneficiary = errors.New("wrong beneficiary")
// ErrUnknownBeneficary is the error if a peer has never announced a beneficiary.
ErrUnknownBeneficary = errors.New("unknown beneficiary for peer")
// ErrChequeValueTooLow is the error a peer issued a cheque not covering 1 accounting credit
......@@ -236,8 +234,16 @@ func (s *Service) SettlementsReceived() (map[string]*big.Int, error) {
// Handshake is called by the swap protocol when a handshake is received.
func (s *Service) Handshake(peer swarm.Address, beneficiary common.Address) error {
oldPeer, known, err := s.addressbook.BeneficiaryPeer(beneficiary)
if err != nil {
return err
}
if known && !peer.Equal(oldPeer) {
s.logger.Debugf("migrating swap addresses from peer %s to %s", oldPeer, peer)
return s.addressbook.MigratePeer(oldPeer, peer)
}
storedBeneficiary, known, err := s.addressbook.Beneficiary(peer)
_, known, err = s.addressbook.Beneficiary(peer)
if err != nil {
return err
}
......@@ -245,9 +251,7 @@ func (s *Service) Handshake(peer swarm.Address, beneficiary common.Address) erro
s.logger.Tracef("initial swap handshake peer: %v beneficiary: %x", peer, beneficiary)
return s.addressbook.PutBeneficiary(peer, beneficiary)
}
if storedBeneficiary != beneficiary {
return ErrWrongBeneficiary
}
return nil
}
......
......@@ -91,6 +91,7 @@ func (t *testObserver) Disconnect(peer swarm.Address) {
}
type addressbookMock struct {
migratePeer func(oldPeer, newPeer swarm.Address) error
beneficiary func(peer swarm.Address) (beneficiary common.Address, known bool, err error)
chequebook func(peer swarm.Address) (chequebookAddress common.Address, known bool, err error)
beneficiaryPeer func(beneficiary common.Address) (peer swarm.Address, known bool, err error)
......@@ -103,6 +104,9 @@ type addressbookMock struct {
getDeductionBy func(peer swarm.Address) (bool, error)
}
func (m *addressbookMock) MigratePeer(oldPeer, newPeer swarm.Address) error {
return m.migratePeer(oldPeer, newPeer)
}
func (m *addressbookMock) Beneficiary(peer swarm.Address) (beneficiary common.Address, known bool, err error) {
return m.beneficiary(peer)
}
......@@ -542,6 +546,12 @@ func TestHandshake(t *testing.T) {
beneficiary: func(p swarm.Address) (common.Address, bool, error) {
return beneficiary, true, nil
},
beneficiaryPeer: func(common.Address) (peer swarm.Address, known bool, err error) {
return peer, true, nil
},
migratePeer: func(oldPeer, newPeer swarm.Address) error {
return nil
},
putBeneficiary: func(p swarm.Address, b common.Address) error {
putCalled = true
return nil
......@@ -582,6 +592,12 @@ func TestHandshakeNewPeer(t *testing.T) {
beneficiary: func(p swarm.Address) (common.Address, bool, error) {
return beneficiary, false, nil
},
beneficiaryPeer: func(beneficiary common.Address) (swarm.Address, bool, error) {
return peer, true, nil
},
migratePeer: func(oldPeer, newPeer swarm.Address) error {
return nil
},
putBeneficiary: func(p swarm.Address, b common.Address) error {
putCalled = true
return nil
......@@ -602,16 +618,14 @@ func TestHandshakeNewPeer(t *testing.T) {
}
}
func TestHandshakeWrongBeneficiary(t *testing.T) {
t.Skip()
func TestMigratePeer(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
store := mockstore.NewStateStore()
beneficiary := common.HexToAddress("0xcd")
peer := swarm.MustParseHexAddress("abcd")
trx := common.HexToHash("0x1")
networkID := uint64(1)
peer := crypto.NewOverlayFromEthereumAddress(beneficiary[:], networkID, trx.Bytes())
swapService := swap.New(
&swapProtocolMock{},
......@@ -619,15 +633,22 @@ func TestHandshakeWrongBeneficiary(t *testing.T) {
store,
mockchequebook.NewChequebook(),
mockchequestore.NewChequeStore(),
&addressbookMock{},
&addressbookMock{
beneficiaryPeer: func(beneficiary common.Address) (swarm.Address, bool, error) {
return swarm.MustParseHexAddress("00112233"), true, nil
},
migratePeer: func(oldPeer, newPeer swarm.Address) error {
return nil
},
},
networkID,
&cashoutMock{},
nil,
)
err := swapService.Handshake(peer, beneficiary)
if !errors.Is(err, swap.ErrWrongBeneficiary) {
t.Fatalf("wrong error. wanted %v, got %v", swap.ErrWrongBeneficiary, err)
if err != nil {
t.Fatal(err)
}
}
......
......@@ -27,7 +27,6 @@ const (
protocolName = "swap"
protocolVersion = "1.0.0"
streamName = "swap" // stream for cheques
initStreamName = "init" // stream for handshake
)
var (
......@@ -93,80 +92,15 @@ func (s *Service) Protocol() p2p.ProtocolSpec {
Handler: s.handler,
Headler: s.headler,
},
{
Name: initStreamName,
Handler: s.initHandler,
},
},
ConnectOut: s.init,
ConnectIn: s.init,
}
}
func (s *Service) initHandler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (err error) {
w, r := protobuf.NewWriterAndReader(stream)
defer func() {
if err != nil {
_ = stream.Reset()
} else {
_ = stream.FullClose()
}
}()
var req pb.Handshake
if err := r.ReadMsgWithContext(ctx, &req); err != nil {
return fmt.Errorf("read request from peer %v: %w", p.Address, err)
}
if len(req.Beneficiary) != 20 {
return errors.New("malformed beneficiary address")
}
err = w.WriteMsgWithContext(ctx, &pb.Handshake{
Beneficiary: s.beneficiary.Bytes(),
})
if err != nil {
return err
}
beneficiary := common.BytesToAddress(req.Beneficiary)
return s.swap.Handshake(p.Address, beneficiary)
}
// init is called on outgoing connections and triggers handshake exchange
func (s *Service) init(ctx context.Context, p p2p.Peer) error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
stream, err := s.streamer.NewStream(ctx, p.Address, nil, protocolName, protocolVersion, initStreamName)
if err != nil {
return err
}
defer func() {
if err != nil {
_ = stream.Reset()
} else {
_ = stream.FullClose() // wait for confirmation
}
}()
w, r := protobuf.NewWriterAndReader(stream)
err = w.WriteMsgWithContext(ctx, &pb.Handshake{
Beneficiary: s.beneficiary.Bytes(),
})
if err != nil {
return err
}
var req pb.Handshake
if err := r.ReadMsgWithContext(ctx, &req); err != nil {
return fmt.Errorf("read request from peer %v: %w", p.Address, err)
}
// any 20-byte byte-sequence is a valid eth address
if len(req.Beneficiary) != 20 {
return errors.New("malformed beneficiary address")
}
beneficiary := common.BytesToAddress(req.Beneficiary)
beneficiary := common.BytesToAddress(p.EthereumAddress)
return s.swap.Handshake(p.Address, beneficiary)
}
......
......@@ -28,68 +28,6 @@ import (
"github.com/ethersphere/bee/pkg/swarm"
)
func TestInit(t *testing.T) {
// Testing handshake
logger := logging.New(ioutil.Discard, 0)
commonAddr := common.HexToAddress("0xab")
peerID := swarm.MustParseHexAddress("9ee7add7")
peer := p2p.Peer{Address: peerID}
swapHsReceiver := swapmock.NewSwap()
swapHsInitiator := swapmock.NewSwap()
priceOracle := priceoraclemock.New(big.NewInt(50), big.NewInt(500))
swappHsReceiver := swapprotocol.New(nil, logger, commonAddr, priceOracle)
swappHsReceiver.SetSwap(swapHsReceiver)
recorder := streamtest.New(
streamtest.WithProtocols(swappHsReceiver.Protocol()),
streamtest.WithBaseAddr(peerID),
)
commonAddr2 := common.HexToAddress("0xdc")
swappHsInitiator := swapprotocol.New(recorder, logger, commonAddr2, priceOracle)
swappHsInitiator.SetSwap(swapHsInitiator)
if err := swappHsInitiator.Init(context.Background(), peer); err != nil {
t.Fatal("bad")
}
records, err := recorder.Records(peerID, "swap", "1.0.0", "init")
if err != nil {
t.Fatal(err)
}
if l := len(records); l != 1 {
t.Fatalf("got %v records, want %v", l, 1)
}
record := records[0]
messages, err := protobuf.ReadMessages(
bytes.NewReader(record.In()),
func() protobuf.Message { return new(pb.Handshake) },
)
if err != nil {
t.Fatal(err)
}
gotBeneficiary := messages[0].(*pb.Handshake).Beneficiary
if !bytes.Equal(gotBeneficiary, commonAddr2.Bytes()) {
t.Fatalf("got %v bytes, want %v bytes", gotBeneficiary, commonAddr2.Bytes())
}
if len(messages) != 1 {
t.Fatalf("got %v messages, want %v", len(messages), 1)
}
messages, err = protobuf.ReadMessages(
bytes.NewReader(record.Out()),
func() protobuf.Message { return new(pb.Handshake) },
)
if err != nil {
t.Fatal(err)
}
gotBeneficiary = messages[0].(*pb.Handshake).Beneficiary
if !bytes.Equal(gotBeneficiary, commonAddr.Bytes()) {
t.Fatalf("got %v bytes, want %v bytes", gotBeneficiary, commonAddr.Bytes())
}
if len(messages) != 1 {
t.Fatalf("got %v messages, want %v", len(messages), 1)
}
}
func TestEmitCheques(t *testing.T) {
// Test negotiating / sending cheques
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment