Commit c4d5b031 authored by Ralph Pichler's avatar Ralph Pichler Committed by GitHub

swap cheque exchange protocol and settlement (#732)

parent f5aa9716
......@@ -14,6 +14,7 @@ import (
"github.com/ethersphere/bee/pkg/accounting"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/settlement"
"github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/swarm"
)
......@@ -455,6 +456,9 @@ func (s *settlementMock) SettlementsReceived() (SettlementReceived map[string]ui
return nil, nil
}
func (s *settlementMock) SetPaymentObserver(settlement.PaymentObserver) {
}
// TestAccountingCallSettlement tests that settlement is called correctly if the payment threshold is hit
func TestAccountingCallSettlement(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
......
......@@ -24,16 +24,21 @@ const (
AddressSize = 20
)
// NewOverlayAddress constructs a Swarm Address from ECDSA private key.
// NewOverlayAddress constructs a Swarm Address from ECDSA public key.
func NewOverlayAddress(p ecdsa.PublicKey, networkID uint64) (swarm.Address, error) {
ethAddr, err := NewEthereumAddress(p)
if err != nil {
return swarm.ZeroAddress, err
}
return NewOverlayFromEthereumAddress(ethAddr, networkID), nil
}
// NewOverlayFromEthereumAddress constructs a Swarm Address for an Ethereum address.
func NewOverlayFromEthereumAddress(ethAddr []byte, networkID uint64) swarm.Address {
netIDBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(netIDBytes, networkID)
h := sha3.Sum256(append(ethAddr, netIDBytes...))
return swarm.NewAddress(h[:]), nil
return swarm.NewAddress(h[:])
}
// GenerateSecp256k1Key generates an ECDSA private key using
......
......@@ -10,7 +10,7 @@ import (
"net/http"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/settlement/pseudosettle"
"github.com/ethersphere/bee/pkg/settlement"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/gorilla/mux"
)
......@@ -102,7 +102,7 @@ func (s *server) peerSettlementsHandler(w http.ResponseWriter, r *http.Request)
received, err := s.Settlement.TotalReceived(peer)
if err != nil {
if !errors.Is(err, pseudosettle.ErrPeerNoSettlements) {
if !errors.Is(err, settlement.ErrPeerNoSettlements) {
s.Logger.Debugf("debug api: settlements peer: get peer %s received settlement: %v", peer.String(), err)
s.Logger.Errorf("debug api: settlements peer: can't get peer %s received settlement", peer.String())
jsonhttp.InternalServerError(w, errCantSettlementsPeer)
......@@ -116,7 +116,7 @@ func (s *server) peerSettlementsHandler(w http.ResponseWriter, r *http.Request)
sent, err := s.Settlement.TotalSent(peer)
if err != nil {
if !errors.Is(err, pseudosettle.ErrPeerNoSettlements) {
if !errors.Is(err, settlement.ErrPeerNoSettlements) {
s.Logger.Debugf("debug api: settlements peer: get peer %s sent settlement: %v", peer.String(), err)
s.Logger.Errorf("debug api: settlements peer: can't get peer %s sent settlement", peer.String())
jsonhttp.InternalServerError(w, errCantSettlementsPeer)
......@@ -129,7 +129,7 @@ func (s *server) peerSettlementsHandler(w http.ResponseWriter, r *http.Request)
}
if !peerexists {
jsonhttp.NotFound(w, pseudosettle.ErrPeerNoSettlements)
jsonhttp.NotFound(w, settlement.ErrPeerNoSettlements)
return
}
......
......@@ -41,8 +41,11 @@ import (
"github.com/ethersphere/bee/pkg/recovery"
"github.com/ethersphere/bee/pkg/resolver/multiresolver"
"github.com/ethersphere/bee/pkg/retrieval"
"github.com/ethersphere/bee/pkg/settlement"
"github.com/ethersphere/bee/pkg/settlement/pseudosettle"
"github.com/ethersphere/bee/pkg/settlement/swap"
"github.com/ethersphere/bee/pkg/settlement/swap/chequebook"
"github.com/ethersphere/bee/pkg/settlement/swap/swapprotocol"
"github.com/ethersphere/bee/pkg/soc"
"github.com/ethersphere/bee/pkg/statestore/leveldb"
mockinmem "github.com/ethersphere/bee/pkg/statestore/mock"
......@@ -148,7 +151,8 @@ func NewBee(addr string, swarmAddress swarm.Address, keystore keystore.Service,
addressbook := addressbook.New(stateStore)
var chequebookService chequebook.Service
var chequeStore chequebook.ChequeStore
var overlayEthAddress common.Address
if o.SwapEnable {
swapBackend, err := ethclient.Dial(o.SwapEndpoint)
if err != nil {
......@@ -158,7 +162,7 @@ func NewBee(addr string, swarmAddress swarm.Address, keystore keystore.Service,
if err != nil {
return nil, err
}
overlayEthAddress, err := signer.EthereumAddress()
overlayEthAddress, err = signer.EthereumAddress()
if err != nil {
return nil, err
}
......@@ -166,7 +170,7 @@ func NewBee(addr string, swarmAddress swarm.Address, keystore keystore.Service,
// print ethereum address so users know which address we need to fund
logger.Infof("using ethereum address %x", overlayEthAddress)
chainId, err := swapBackend.ChainID(p2pCtx)
chainID, err := swapBackend.ChainID(p2pCtx)
if err != nil {
return nil, err
}
......@@ -184,10 +188,9 @@ func NewBee(addr string, swarmAddress swarm.Address, keystore keystore.Service,
return nil, err
}
chequeSigner := chequebook.NewChequeSigner(signer, chainId.Int64())
chequeSigner := chequebook.NewChequeSigner(signer, chainID.Int64())
// initialize chequebook logic
// return value is ignored because we don't do anything yet after initialization. this will be passed into swap settlement.
chequebookService, err = chequebook.Init(p2pCtx,
chequebookFactory,
stateStore,
......@@ -202,6 +205,8 @@ func NewBee(addr string, swarmAddress swarm.Address, keystore keystore.Service,
if err != nil {
return nil, err
}
chequeStore = chequebook.NewChequeStore(stateStore, swapBackend, chequebookFactory, chainID.Int64(), overlayEthAddress, chequebook.NewSimpleSwapBindings, chequebook.RecoverCheque)
}
p2ps, err := libp2p.New(p2pCtx, signer, networkID, swarmAddress, addr, addressbook, stateStore, logger, tracer, libp2p.Options{
......@@ -259,6 +264,38 @@ func NewBee(addr string, swarmAddress swarm.Address, keystore keystore.Service,
}
}
var settlement settlement.Interface
if o.SwapEnable {
swapProtocol := swapprotocol.New(p2ps, logger, overlayEthAddress)
swapAddressBook := swap.NewAddressbook(stateStore)
swapService := swap.New(swapProtocol, logger, stateStore, chequebookService, chequeStore, swapAddressBook, networkID)
swapProtocol.SetSwap(swapService)
if err = p2ps.AddProtocol(swapProtocol.Protocol()); err != nil {
return nil, fmt.Errorf("swap protocol: %w", err)
}
settlement = swapService
} else {
pseudosettleService := pseudosettle.New(p2ps, logger, stateStore)
if err = p2ps.AddProtocol(pseudosettleService.Protocol()); err != nil {
return nil, fmt.Errorf("pseudosettle service: %w", err)
}
settlement = pseudosettleService
}
acc, err := accounting.NewAccounting(accounting.Options{
Logger: logger,
Store: stateStore,
PaymentThreshold: o.PaymentThreshold,
PaymentTolerance: o.PaymentTolerance,
EarlyPayment: o.PaymentEarly,
Settlement: settlement,
})
if err != nil {
return nil, fmt.Errorf("accounting: %w", err)
}
settlement.SetPaymentObserver(acc)
kad := kademlia.New(swarmAddress, addressbook, hive, p2ps, logger, kademlia.Options{Bootnodes: bootnodes, Standalone: o.Standalone})
b.topologyCloser = kad
hive.SetAddPeersHandler(kad.AddPeers)
......@@ -286,26 +323,6 @@ func NewBee(addr string, swarmAddress swarm.Address, keystore keystore.Service,
}
b.localstoreCloser = storer
settlement := pseudosettle.New(p2ps, logger, stateStore)
if err = p2ps.AddProtocol(settlement.Protocol()); err != nil {
return nil, fmt.Errorf("pseudosettle service: %w", err)
}
acc, err := accounting.NewAccounting(accounting.Options{
Logger: logger,
Store: stateStore,
PaymentThreshold: o.PaymentThreshold,
PaymentTolerance: o.PaymentTolerance,
EarlyPayment: o.PaymentEarly,
Settlement: settlement,
})
if err != nil {
return nil, fmt.Errorf("accounting: %w", err)
}
settlement.SetPaymentObserver(acc)
chunkvalidator := swarm.NewChunkValidator(soc.NewValidator(), content.NewValidator())
retrieve := retrieval.New(swarmAddress, p2ps, kad, logger, acc, accounting.NewFixedPricer(swarmAddress, 10), chunkvalidator, tracer)
......@@ -413,7 +430,6 @@ func NewBee(addr string, swarmAddress swarm.Address, keystore keystore.Service,
debugAPIService.MustRegisterMetrics(p2ps.Metrics()...)
debugAPIService.MustRegisterMetrics(pingPong.Metrics()...)
debugAPIService.MustRegisterMetrics(acc.Metrics()...)
debugAPIService.MustRegisterMetrics(settlement.Metrics()...)
if apiService != nil {
debugAPIService.MustRegisterMetrics(apiService.Metrics()...)
......@@ -422,6 +438,10 @@ func NewBee(addr string, swarmAddress swarm.Address, keystore keystore.Service,
debugAPIService.MustRegisterMetrics(l.Metrics()...)
}
if l, ok := settlement.(metrics.Collector); ok {
debugAPIService.MustRegisterMetrics(l.Metrics()...)
}
debugAPIListener, err := net.Listen("tcp", o.DebugAPIAddr)
if err != nil {
return nil, fmt.Errorf("debug api listener: %w", err)
......
......@@ -6,10 +6,15 @@ package settlement
import (
"context"
"errors"
"github.com/ethersphere/bee/pkg/swarm"
)
var (
ErrPeerNoSettlements = errors.New("no settlements for peer")
)
// Interface is the interface used by Accounting to trigger settlement
type Interface interface {
// Pay initiates a payment to the given peer
......@@ -23,6 +28,8 @@ type Interface interface {
SettlementsSent() (map[string]uint64, error)
// SettlementsReceived returns received settlements for each individual known peer
SettlementsReceived() (map[string]uint64, error)
// SetPaymentObserver sets the PaymentObserver to notify
SetPaymentObserver(observer PaymentObserver)
}
// PaymentObserver is the interface Settlement uses to notify other components of an incoming payment
......
......@@ -104,6 +104,9 @@ func (s *Service) SettlementsReceived() (map[string]uint64, error) {
return s.settlementsRecv, nil
}
func (s *Service) SetPaymentObserver(settlement.PaymentObserver) {
}
// Option is the option passed to the mock settlement service
type Option interface {
apply(*Service)
......
......@@ -29,7 +29,6 @@ const (
var (
SettlementReceivedPrefix = "pseudosettle_total_received_"
SettlementSentPrefix = "pseudosettle_total_sent_"
ErrPeerNoSettlements = errors.New("no settlements for peer")
)
type Service struct {
......@@ -95,7 +94,7 @@ func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (e
totalReceived, err := s.TotalReceived(p.Address)
if err != nil {
if !errors.Is(err, ErrPeerNoSettlements) {
if !errors.Is(err, settlement.ErrPeerNoSettlements) {
return err
}
totalReceived = 0
......@@ -136,7 +135,7 @@ func (s *Service) Pay(ctx context.Context, peer swarm.Address, amount uint64) er
}
totalSent, err := s.TotalSent(peer)
if err != nil {
if !errors.Is(err, ErrPeerNoSettlements) {
if !errors.Is(err, settlement.ErrPeerNoSettlements) {
return err
}
totalSent = 0
......@@ -160,7 +159,7 @@ func (s *Service) TotalSent(peer swarm.Address) (totalSent uint64, err error) {
err = s.store.Get(key, &totalSent)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return 0, ErrPeerNoSettlements
return 0, settlement.ErrPeerNoSettlements
}
return 0, err
}
......@@ -173,7 +172,7 @@ func (s *Service) TotalReceived(peer swarm.Address) (totalReceived uint64, err e
err = s.store.Get(key, &totalReceived)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return 0, ErrPeerNoSettlements
return 0, settlement.ErrPeerNoSettlements
}
return 0, err
}
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package swap
import (
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
var (
peerPrefix = "chequebook_peer_"
peerChequebookPrefix = "peer_chequebook_"
beneficiaryPeerPrefix = "beneficiary_peer_"
peerBeneficiaryPrefix = "peer_beneficiary_"
)
// Addressbook maps peers to beneficaries, chequebooks and in reverse.
type Addressbook interface {
// Beneficiary returns the beneficiary for the given peer.
Beneficiary(peer swarm.Address) (beneficiary common.Address, known bool, err error)
// Chequebook returns the chequebook for the given peer.
Chequebook(peer swarm.Address) (chequebookAddress common.Address, known bool, err error)
// BeneficiaryPeer returns the peer for a beneficiary.
BeneficiaryPeer(beneficiary common.Address) (peer swarm.Address, known bool, err error)
// ChequebookPeer returns the peer for a beneficiary.
ChequebookPeer(chequebook common.Address) (peer swarm.Address, known bool, err error)
// PutBeneficiary stores the beneficiary for the given peer.
PutBeneficiary(peer swarm.Address, beneficiary common.Address) error
// PutChequebook stores the chequebook for the given peer.
PutChequebook(peer swarm.Address, chequebook common.Address) error
}
type addressbook struct {
store storage.StateStorer
}
// NewAddressbook creates a new addressbook using the store.
func NewAddressbook(store storage.StateStorer) Addressbook {
return &addressbook{
store: store,
}
}
// Beneficiary returns the beneficiary for the given peer.
func (a *addressbook) Beneficiary(peer swarm.Address) (beneficiary common.Address, known bool, err error) {
err = a.store.Get(peerBeneficiaryKey(peer), &beneficiary)
if err != nil {
if err != storage.ErrNotFound {
return common.Address{}, false, err
}
return common.Address{}, false, nil
}
return beneficiary, true, nil
}
// BeneficiaryPeer returns the peer for a beneficiary.
func (a *addressbook) BeneficiaryPeer(beneficiary common.Address) (peer swarm.Address, known bool, err error) {
err = a.store.Get(beneficiaryPeerKey(beneficiary), &peer)
if err != nil {
if err != storage.ErrNotFound {
return swarm.Address{}, false, err
}
return swarm.Address{}, false, nil
}
return peer, true, nil
}
// Chequebook returns the chequebook for the given peer.
func (a *addressbook) Chequebook(peer swarm.Address) (chequebookAddress common.Address, known bool, err error) {
err = a.store.Get(peerKey(peer), &chequebookAddress)
if err != nil {
if err != storage.ErrNotFound {
return common.Address{}, false, err
}
return common.Address{}, false, nil
}
return chequebookAddress, true, nil
}
// ChequebookPeer returns the peer for a beneficiary.
func (a *addressbook) ChequebookPeer(chequebook common.Address) (peer swarm.Address, known bool, err error) {
err = a.store.Get(chequebookPeerKey(chequebook), &peer)
if err != nil {
if err != storage.ErrNotFound {
return swarm.Address{}, false, err
}
return swarm.Address{}, false, nil
}
return peer, true, nil
}
// PutBeneficiary stores the beneficiary for the given peer.
func (a *addressbook) PutBeneficiary(peer swarm.Address, beneficiary common.Address) error {
err := a.store.Put(peerBeneficiaryKey(peer), beneficiary)
if err != nil {
return err
}
return a.store.Put(beneficiaryPeerKey(beneficiary), peer)
}
// PutChequebook stores the chequebook for the given peer.
func (a *addressbook) PutChequebook(peer swarm.Address, chequebook common.Address) error {
err := a.store.Put(peerKey(peer), chequebook)
if err != nil {
return err
}
return a.store.Put(chequebookPeerKey(chequebook), peer)
}
// peerKey computes the key where to store the chequebook from a peer.
func peerKey(peer swarm.Address) string {
return fmt.Sprintf("%s%s", peerPrefix, peer)
}
// chequebookPeerKey computes the key where to store the peer for a chequebook.
func chequebookPeerKey(chequebook common.Address) string {
return fmt.Sprintf("%s%s", peerChequebookPrefix, chequebook)
}
// peerBeneficiaryKey computes the key where to store the beneficiary for a peer.
func peerBeneficiaryKey(peer swarm.Address) string {
return fmt.Sprintf("%s%s", peerBeneficiaryPrefix, peer)
}
// beneficiaryPeerKey computes the key where to store the peer for a beneficiary.
func beneficiaryPeerKey(peer common.Address) string {
return fmt.Sprintf("%s%s", beneficiaryPeerPrefix, peer)
}
......@@ -16,6 +16,7 @@ import (
// SimpleSwapBinding is the interface for the generated go bindings for ERC20SimpleSwap
type SimpleSwapBinding interface {
Balance(*bind.CallOpts) (*big.Int, error)
Issuer(*bind.CallOpts) (common.Address, error)
}
type SimpleSwapBindingFunc = func(common.Address, bind.ContractBackend) (SimpleSwapBinding, error)
......
......@@ -5,6 +5,7 @@
package chequebook
import (
"bytes"
"fmt"
"math/big"
......@@ -96,3 +97,20 @@ func (s *chequeSigner) Sign(cheque *Cheque) ([]byte, error) {
func (cheque *Cheque) String() string {
return fmt.Sprintf("Contract: %x Beneficiary: %x CumulativePayout: %v", cheque.Chequebook, cheque.Beneficiary, cheque.CumulativePayout)
}
func (cheque *Cheque) Equal(other *Cheque) bool {
if cheque.Beneficiary != other.Beneficiary {
return false
}
if cheque.CumulativePayout.Cmp(other.CumulativePayout) != 0 {
return false
}
return cheque.Chequebook == other.Chequebook
}
func (cheque *SignedCheque) Equal(other *SignedCheque) bool {
if !bytes.Equal(cheque.Signature, other.Signature) {
return false
}
return cheque.Cheque.Equal(&other.Cheque)
}
......@@ -10,6 +10,7 @@ import (
"fmt"
"math/big"
"strings"
"sync"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
......@@ -18,6 +19,13 @@ import (
"github.com/ethersphere/sw3-bindings/v2/simpleswapfactory"
)
// SendChequeFunc is a function to send cheques.
type SendChequeFunc func(cheque *SignedCheque) error
const (
lastIssuedChequeKeyPrefix = "chequebook_last_issued_cheque_"
)
// Service is the main interface for interacting with the nodes chequebook.
type Service interface {
// Deposit starts depositing erc20 token into the chequebook. This returns once the transactions has been broadcast.
......@@ -29,10 +37,15 @@ type Service interface {
// Address returns the address of the used chequebook contract.
Address() common.Address
// Issue a new cheque for the beneficiary with an cumulativePayout amount higher than the last.
Issue(beneficiary common.Address, amount *big.Int) (*SignedCheque, error)
Issue(beneficiary common.Address, amount *big.Int, sendChequeFunc SendChequeFunc) error
// LastCheque returns the last cheque we issued for the beneficiary.
LastCheque(beneficiary common.Address) (*SignedCheque, error)
// LastCheque returns the last cheques for all beneficiaries.
LastCheques() (map[common.Address]*SignedCheque, error)
}
type service struct {
lock sync.Mutex
backend Backend
transactionService TransactionService
......@@ -145,16 +158,23 @@ func (s *service) WaitForDeposit(ctx context.Context, txHash common.Hash) error
return nil
}
// Issue issues a new cheque.
func (s *service) Issue(beneficiary common.Address, amount *big.Int) (*SignedCheque, error) {
storeKey := fmt.Sprintf("chequebook_last_issued_cheque_%x", beneficiary)
// lastIssuedChequeKey computes the key where to store the last cheque for a beneficiary.
func lastIssuedChequeKey(beneficiary common.Address) string {
return fmt.Sprintf("chequebook_last_issued_cheque_%x", beneficiary)
}
// Issue issues a new cheque and passes it to sendChequeFunc
// if sendChequeFunc succeeds the cheque is considered sent and saved
func (s *service) Issue(beneficiary common.Address, amount *big.Int, sendChequeFunc SendChequeFunc) error {
// don't allow concurrent issuing of cheques
// this would be sufficient on a per beneficiary basis
s.lock.Lock()
defer s.lock.Unlock()
var cumulativePayout *big.Int
var lastCheque Cheque
err := s.store.Get(storeKey, &lastCheque)
lastCheque, err := s.LastCheque(beneficiary)
if err != nil {
if err != storage.ErrNotFound {
return nil, err
if err != ErrNoCheque {
return err
}
cumulativePayout = big.NewInt(0)
} else {
......@@ -164,24 +184,79 @@ func (s *service) Issue(beneficiary common.Address, amount *big.Int) (*SignedChe
// increase cumulativePayout by amount
cumulativePayout = cumulativePayout.Add(cumulativePayout, amount)
// create and sign the new cheque
cheque := Cheque{
Chequebook: s.address,
CumulativePayout: cumulativePayout,
Beneficiary: beneficiary,
}
sig, err := s.chequeSigner.Sign(&cheque)
sig, err := s.chequeSigner.Sign(&Cheque{
Chequebook: s.address,
CumulativePayout: cumulativePayout,
Beneficiary: beneficiary,
})
if err != nil {
return nil, err
return err
}
err = s.store.Put(storeKey, cheque)
// actually send the check before saving to avoid double payment
err = sendChequeFunc(&SignedCheque{
Cheque: cheque,
Signature: sig,
})
if err != nil {
return nil, err
return err
}
return &SignedCheque{
Cheque: cheque,
Signature: sig,
}, nil
return s.store.Put(lastIssuedChequeKey(beneficiary), cheque)
}
// LastCheque returns the last cheque we issued for the beneficiary.
func (s *service) LastCheque(beneficiary common.Address) (*SignedCheque, error) {
var lastCheque *SignedCheque
err := s.store.Get(lastIssuedChequeKey(beneficiary), &lastCheque)
if err != nil {
if err != storage.ErrNotFound {
return nil, err
}
return nil, ErrNoCheque
}
return lastCheque, nil
}
func keyBeneficiary(key []byte, prefix string) (beneficiary common.Address, err error) {
k := string(key)
split := strings.SplitAfter(k, prefix)
if len(split) != 2 {
return common.Address{}, errors.New("no beneficiary in key")
}
return common.HexToAddress(split[1]), nil
}
// LastCheque returns the last cheques for all beneficiaries.
func (s *service) LastCheques() (map[common.Address]*SignedCheque, error) {
result := make(map[common.Address]*SignedCheque)
err := s.store.Iterate(lastIssuedChequeKeyPrefix, func(key, val []byte) (stop bool, err error) {
addr, err := keyBeneficiary(key, lastIssuedChequeKeyPrefix)
if err != nil {
return false, fmt.Errorf("parse address from key: %s: %w", string(key), err)
}
if _, ok := result[addr]; !ok {
lastCheque, err := s.LastCheque(addr)
if err != nil {
return false, err
}
result[addr] = lastCheque
}
return false, nil
})
if err != nil {
return nil, err
}
return result, nil
}
......@@ -14,6 +14,8 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethersphere/bee/pkg/settlement/swap/chequebook"
storemock "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage"
)
func newTestChequebook(
......@@ -23,6 +25,8 @@ func newTestChequebook(
address,
erc20address,
ownerAdress common.Address,
store storage.StateStorer,
chequeSigner chequebook.ChequeSigner,
simpleSwapBinding chequebook.SimpleSwapBinding,
erc20Binding chequebook.ERC20Binding) (chequebook.Service, error) {
return chequebook.New(
......@@ -31,8 +35,8 @@ func newTestChequebook(
address,
erc20address,
ownerAdress,
nil,
nil,
store,
chequeSigner,
func(addr common.Address, b bind.ContractBackend) (chequebook.SimpleSwapBinding, error) {
if addr != address {
t.Fatalf("initialised binding with wrong address. wanted %x, got %x", address, addr)
......@@ -65,6 +69,8 @@ func TestChequebookAddress(t *testing.T) {
address,
erc20address,
ownerAdress,
nil,
&chequeSignerMock{},
&simpleSwapBindingMock{},
&erc20BindingMock{})
if err != nil {
......@@ -88,6 +94,8 @@ func TestChequebookBalance(t *testing.T) {
address,
erc20address,
ownerAdress,
nil,
&chequeSignerMock{},
&simpleSwapBindingMock{
balance: func(*bind.CallOpts) (*big.Int, error) {
return balance, nil
......@@ -132,6 +140,8 @@ func TestChequebookDeposit(t *testing.T) {
address,
erc20address,
ownerAdress,
nil,
&chequeSignerMock{},
&simpleSwapBindingMock{},
&erc20BindingMock{
balanceOf: func(b *bind.CallOpts, addr common.Address) (*big.Int, error) {
......@@ -176,6 +186,8 @@ func TestChequebookWaitForDeposit(t *testing.T) {
address,
erc20address,
ownerAdress,
nil,
&chequeSignerMock{},
&simpleSwapBindingMock{},
&erc20BindingMock{})
if err != nil {
......@@ -209,6 +221,8 @@ func TestChequebookWaitForDepositReverted(t *testing.T) {
address,
erc20address,
ownerAdress,
nil,
&chequeSignerMock{},
&simpleSwapBindingMock{},
&erc20BindingMock{})
if err != nil {
......@@ -223,3 +237,195 @@ func TestChequebookWaitForDepositReverted(t *testing.T) {
t.Fatalf("wrong error. wanted %v, got %v", chequebook.ErrTransactionReverted, err)
}
}
func TestChequebookIssue(t *testing.T) {
address := common.HexToAddress("0xabcd")
erc20address := common.HexToAddress("0xefff")
beneficiary := common.HexToAddress("0xdddd")
ownerAdress := common.HexToAddress("0xfff")
store := storemock.NewStateStore()
amount := big.NewInt(20)
amount2 := big.NewInt(30)
expectedCumulative := big.NewInt(50)
sig := common.Hex2Bytes("0xffff")
chequeSigner := &chequeSignerMock{}
chequebookService, err := newTestChequebook(
t,
&backendMock{},
&transactionServiceMock{},
address,
erc20address,
ownerAdress,
store,
chequeSigner,
&simpleSwapBindingMock{},
&erc20BindingMock{})
if err != nil {
t.Fatal(err)
}
// issue a cheque
expectedCheque := &chequebook.SignedCheque{
Cheque: chequebook.Cheque{
Beneficiary: beneficiary,
CumulativePayout: amount,
Chequebook: address,
},
Signature: sig,
}
chequeSigner.sign = func(cheque *chequebook.Cheque) ([]byte, error) {
if !cheque.Equal(&expectedCheque.Cheque) {
t.Fatalf("wrong cheque. wanted %v got %v", expectedCheque.Cheque, cheque)
}
return sig, nil
}
err = chequebookService.Issue(beneficiary, amount, func(cheque *chequebook.SignedCheque) error {
if !cheque.Equal(expectedCheque) {
t.Fatalf("wrong cheque. wanted %v got %v", expectedCheque, cheque)
}
return nil
})
if err != nil {
t.Fatal(err)
}
lastCheque, err := chequebookService.LastCheque(beneficiary)
if err != nil {
t.Fatal(err)
}
if !lastCheque.Equal(expectedCheque) {
t.Fatalf("wrong cheque stored. wanted %v got %v", expectedCheque, lastCheque)
}
// issue another cheque for the same beneficiary
expectedCheque = &chequebook.SignedCheque{
Cheque: chequebook.Cheque{
Beneficiary: beneficiary,
CumulativePayout: expectedCumulative,
Chequebook: address,
},
Signature: sig,
}
chequeSigner.sign = func(cheque *chequebook.Cheque) ([]byte, error) {
if !cheque.Equal(&expectedCheque.Cheque) {
t.Fatalf("wrong cheque. wanted %v got %v", expectedCheque, cheque)
}
return sig, nil
}
err = chequebookService.Issue(beneficiary, amount2, func(cheque *chequebook.SignedCheque) error {
if !cheque.Equal(expectedCheque) {
t.Fatalf("wrong cheque. wanted %v got %v", expectedCheque, cheque)
}
return nil
})
if err != nil {
t.Fatal(err)
}
lastCheque, err = chequebookService.LastCheque(beneficiary)
if err != nil {
t.Fatal(err)
}
if !lastCheque.Equal(expectedCheque) {
t.Fatalf("wrong cheque stored. wanted %v got %v", expectedCheque, lastCheque)
}
// issue another cheque for the different beneficiary
expectedChequeOwner := &chequebook.SignedCheque{
Cheque: chequebook.Cheque{
Beneficiary: ownerAdress,
CumulativePayout: amount,
Chequebook: address,
},
Signature: sig,
}
chequeSigner.sign = func(cheque *chequebook.Cheque) ([]byte, error) {
if !cheque.Equal(&expectedChequeOwner.Cheque) {
t.Fatalf("wrong cheque. wanted %v got %v", expectedCheque, cheque)
}
return sig, nil
}
err = chequebookService.Issue(ownerAdress, amount, func(cheque *chequebook.SignedCheque) error {
if !cheque.Equal(expectedChequeOwner) {
t.Fatalf("wrong cheque. wanted %v got %v", expectedChequeOwner, cheque)
}
return nil
})
if err != nil {
t.Fatal(err)
}
lastCheque, err = chequebookService.LastCheque(ownerAdress)
if err != nil {
t.Fatal(err)
}
if !lastCheque.Equal(expectedChequeOwner) {
t.Fatalf("wrong cheque stored. wanted %v got %v", expectedChequeOwner, lastCheque)
}
// finally check this did not interfere with the beneficiary cheque
lastCheque, err = chequebookService.LastCheque(beneficiary)
if err != nil {
t.Fatal(err)
}
if !lastCheque.Equal(expectedCheque) {
t.Fatalf("wrong cheque stored. wanted %v got %v", expectedCheque, lastCheque)
}
}
func TestChequebookIssueErrorSend(t *testing.T) {
address := common.HexToAddress("0xabcd")
erc20address := common.HexToAddress("0xefff")
beneficiary := common.HexToAddress("0xdddd")
ownerAdress := common.HexToAddress("0xfff")
store := storemock.NewStateStore()
amount := big.NewInt(20)
sig := common.Hex2Bytes("0xffff")
chequeSigner := &chequeSignerMock{}
chequebookService, err := newTestChequebook(
t,
&backendMock{},
&transactionServiceMock{},
address,
erc20address,
ownerAdress,
store,
chequeSigner,
&simpleSwapBindingMock{},
&erc20BindingMock{})
if err != nil {
t.Fatal(err)
}
chequeSigner.sign = func(cheque *chequebook.Cheque) ([]byte, error) {
return sig, nil
}
err = chequebookService.Issue(beneficiary, amount, func(cheque *chequebook.SignedCheque) error {
return errors.New("err")
})
if err == nil {
t.Fatal("expected error")
}
// verify the cheque was not saved
_, err = chequebookService.LastCheque(beneficiary)
if err == nil {
t.Fatal("expected error")
}
if !errors.Is(err, chequebook.ErrNoCheque) {
t.Fatalf("wrong error. wanted %v, got %v", chequebook.ErrNoCheque, err)
}
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package chequebook
import (
"context"
"errors"
"fmt"
"math/big"
"strings"
"sync"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/storage"
)
var (
// ErrNoCheque is the error returned if there is no prior cheque for a chequebook or beneficiary.
ErrNoCheque = errors.New("no cheque")
// ErrChequeNotIncreasing is the error returned if the cheque amount is the same or lower.
ErrChequeNotIncreasing = errors.New("cheque cumulativePayout is not increasing")
// ErrChequeInvalid is the error returned if the cheque itself is invalid.
ErrChequeInvalid = errors.New("invalid cheque")
// ErrWrongBeneficiary is the error returned if the cheque has the wrong beneficiary.
ErrWrongBeneficiary = errors.New("wrong beneficiary")
// ErrBouncingCheque is the error returned if the chequebook is demonstrably illiquid.
ErrBouncingCheque = errors.New("bouncing cheque")
lastReceivedChequePrefix = "chequebook_last_received_cheque_"
)
// ChequeStore handles the verification and storage of received cheques
type ChequeStore interface {
// ReceiveCheque verifies and stores a cheque. It returns the totam amount earned.
ReceiveCheque(ctx context.Context, cheque *SignedCheque) (*big.Int, error)
// LastCheque returns the last cheque we received from a specific chequebook.
LastCheque(chequebook common.Address) (*SignedCheque, error)
// LastCheques returns the last received cheques from every known chequebook.
LastCheques() (map[common.Address]*SignedCheque, error)
}
type chequeStore struct {
lock sync.Mutex
store storage.StateStorer
factory Factory
chaindID int64
simpleSwapBindingFunc SimpleSwapBindingFunc
backend Backend
beneficiary common.Address // the beneficiary we expect in cheques sent to us
recoverChequeFunc RecoverChequeFunc
}
type RecoverChequeFunc func(cheque *SignedCheque, chainID int64) (common.Address, error)
// NewChequeStore creates new ChequeStore
func NewChequeStore(
store storage.StateStorer,
backend Backend,
factory Factory,
chainID int64,
beneficiary common.Address,
simpleSwapBindingFunc SimpleSwapBindingFunc,
recoverChequeFunc RecoverChequeFunc) ChequeStore {
return &chequeStore{
store: store,
factory: factory,
backend: backend,
chaindID: chainID,
simpleSwapBindingFunc: simpleSwapBindingFunc,
beneficiary: beneficiary,
recoverChequeFunc: recoverChequeFunc,
}
}
// lastReceivedChequeKey computes the key where to store the last cheque received from a chequebook.
func lastReceivedChequeKey(chequebook common.Address) string {
return fmt.Sprintf("chequebook_last_received_cheque_%x", chequebook)
}
// LastCheque returns the last cheque we received from a specific chequebook.
func (s *chequeStore) LastCheque(chequebook common.Address) (*SignedCheque, error) {
var cheque *SignedCheque
err := s.store.Get(lastReceivedChequeKey(chequebook), &cheque)
if err != nil {
if err != storage.ErrNotFound {
return nil, err
}
return nil, ErrNoCheque
}
return cheque, nil
}
// ReceiveCheque verifies and stores a cheque. It returns the totam amount earned.
func (s *chequeStore) ReceiveCheque(ctx context.Context, cheque *SignedCheque) (*big.Int, error) {
// verify we are the beneficiary
if cheque.Beneficiary != s.beneficiary {
return nil, ErrWrongBeneficiary
}
// don't allow concurrent processing of cheques
// this would be sufficient on a per chequebook basis
s.lock.Lock()
defer s.lock.Unlock()
// load the lastCumulativePayout for the cheques chequebook
var lastCumulativePayout *big.Int
var lastReceivedCheque *SignedCheque
err := s.store.Get(lastReceivedChequeKey(cheque.Chequebook), &lastReceivedCheque)
if err != nil {
if err != storage.ErrNotFound {
return nil, err
}
// if this is the first cheque from this chequebook, verify with the factory.
err = s.factory.VerifyChequebook(ctx, cheque.Chequebook)
if err != nil {
return nil, err
}
lastCumulativePayout = big.NewInt(0)
} else {
lastCumulativePayout = lastReceivedCheque.CumulativePayout
}
// check this cheque is actually increasing in value
amount := big.NewInt(0).Sub(cheque.CumulativePayout, lastCumulativePayout)
if amount.Cmp(big.NewInt(0)) <= 0 {
return nil, ErrChequeNotIncreasing
}
// blockchain calls below
binding, err := s.simpleSwapBindingFunc(cheque.Chequebook, s.backend)
if err != nil {
return nil, err
}
// this does not change for the same chequebook
expectedIssuer, err := binding.Issuer(&bind.CallOpts{
Context: ctx,
})
if err != nil {
return nil, err
}
// verify the cheque signature
issuer, err := s.recoverChequeFunc(cheque, s.chaindID)
if err != nil {
return nil, err
}
if issuer != expectedIssuer {
return nil, ErrChequeInvalid
}
// basic liquidity check
// could be omitted as it is not particularly useful
balance, err := binding.Balance(&bind.CallOpts{
Context: ctx,
})
if err != nil {
return nil, err
}
if balance.Cmp(cheque.CumulativePayout) < 0 {
return nil, ErrBouncingCheque
}
// store the accepted cheque
err = s.store.Put(lastReceivedChequeKey(cheque.Chequebook), cheque)
if err != nil {
return nil, err
}
return amount, nil
}
// RecoverCheque recovers the issuer ethereum address from a signed cheque
func RecoverCheque(cheque *SignedCheque, chaindID int64) (common.Address, error) {
eip712Data := eip712DataForCheque(&cheque.Cheque, chaindID)
pubkey, err := crypto.RecoverEIP712(cheque.Signature, eip712Data)
if err != nil {
return common.Address{}, err
}
ethAddr, err := crypto.NewEthereumAddress(*pubkey)
if err != nil {
return common.Address{}, err
}
var issuer common.Address
copy(issuer[:], ethAddr)
return issuer, nil
}
// keyChequebook computes the chequebook a store entry is for.
func keyChequebook(key []byte, prefix string) (chequebook common.Address, err error) {
k := string(key)
split := strings.SplitAfter(k, prefix)
if len(split) != 2 {
return common.Address{}, errors.New("no peer in key")
}
return common.HexToAddress(split[1]), nil
}
// LastCheques returns the last received cheques from every known chequebook.
func (s *chequeStore) LastCheques() (map[common.Address]*SignedCheque, error) {
result := make(map[common.Address]*SignedCheque)
err := s.store.Iterate(lastReceivedChequePrefix, func(key, val []byte) (stop bool, err error) {
addr, err := keyChequebook(key, lastReceivedChequePrefix)
if err != nil {
return false, fmt.Errorf("parse address from key: %s: %w", string(key), err)
}
if _, ok := result[addr]; !ok {
lastCheque, err := s.LastCheque(addr)
if err != nil && err != ErrNoCheque {
return false, err
} else if err == ErrNoCheque {
return false, nil
}
result[addr] = lastCheque
}
return false, nil
})
if err != nil {
return nil, err
}
return result, nil
}
This diff is collapsed.
......@@ -99,14 +99,15 @@ func (m *simpleSwapFactoryBindingMock) ERC20Address(o *bind.CallOpts) (common.Ad
type simpleSwapBindingMock struct {
balance func(*bind.CallOpts) (*big.Int, error)
issuer func(*bind.CallOpts) (common.Address, error)
}
func (m *simpleSwapBindingMock) Balance(o *bind.CallOpts) (*big.Int, error) {
return m.balance(o)
}
func (m *simpleSwapBindingMock) Issuer(*bind.CallOpts) (common.Address, error) {
return common.Address{}, nil
func (m *simpleSwapBindingMock) Issuer(o *bind.CallOpts) (common.Address, error) {
return m.issuer(o)
}
type erc20BindingMock struct {
......@@ -141,3 +142,38 @@ func (*signerMock) PublicKey() (*ecdsa.PublicKey, error) {
func (m *signerMock) SignTypedData(d *eip712.TypedData) ([]byte, error) {
return m.signTypedData(d)
}
type chequeSignerMock struct {
sign func(cheque *chequebook.Cheque) ([]byte, error)
}
func (m *chequeSignerMock) Sign(cheque *chequebook.Cheque) ([]byte, error) {
return m.sign(cheque)
}
type factoryMock struct {
erc20Address func(ctx context.Context) (common.Address, error)
deploy func(ctx context.Context, issuer common.Address, defaultHardDepositTimeoutDuration *big.Int) (common.Address, error)
verifyBytecode func(ctx context.Context) error
verifyChequebook func(ctx context.Context, chequebook common.Address) error
}
// ERC20Address returns the token for which this factory deploys chequebooks.
func (m *factoryMock) ERC20Address(ctx context.Context) (common.Address, error) {
return m.erc20Address(ctx)
}
// Deploy deploys a new chequebook and returns once confirmed.
func (m *factoryMock) Deploy(ctx context.Context, issuer common.Address, defaultHardDepositTimeoutDuration *big.Int) (common.Address, error) {
return m.deploy(ctx, issuer, defaultHardDepositTimeoutDuration)
}
// VerifyBytecode checks that the factory is valid.
func (m *factoryMock) VerifyBytecode(ctx context.Context) error {
return m.verifyBytecode(ctx)
}
// VerifyChequebook checks that the supplied chequebook has been deployed by this factory.
func (m *factoryMock) VerifyChequebook(ctx context.Context, chequebook common.Address) error {
return m.verifyChequebook(ctx, chequebook)
}
......@@ -17,6 +17,7 @@ import (
type Service struct {
chequebookBalanceFunc func(context.Context) (*big.Int, error)
chequebookAddressFunc func() common.Address
chequebookIssueFunc func(beneficiary common.Address, amount *big.Int, sendChequeFunc chequebook.SendChequeFunc) error
}
// WithChequebook*Functions set the mock chequebook functions
......@@ -32,6 +33,12 @@ func WithChequebookAddressFunc(f func() common.Address) Option {
})
}
func WithChequebookIssueFunc(f func(beneficiary common.Address, amount *big.Int, sendChequeFunc chequebook.SendChequeFunc) error) Option {
return optionFunc(func(s *Service) {
s.chequebookIssueFunc = f
})
}
// NewChequebook creates the mock chequebook implementation
func NewChequebook(opts ...Option) chequebook.Service {
mock := new(Service)
......@@ -67,7 +74,18 @@ func (s *Service) Address() common.Address {
return common.Address{}
}
func (s *Service) Issue(beneficiary common.Address, amount *big.Int) (*chequebook.SignedCheque, error) {
func (s *Service) Issue(beneficiary common.Address, amount *big.Int, sendChequeFunc chequebook.SendChequeFunc) error {
if s.chequebookIssueFunc != nil {
return s.chequebookIssueFunc(beneficiary, amount, sendChequeFunc)
}
return nil
}
func (s *Service) LastCheque(beneficiary common.Address) (*chequebook.SignedCheque, error) {
return nil, errors.New("Error")
}
func (s *Service) LastCheques() (map[common.Address]*chequebook.SignedCheque, error) {
return nil, errors.New("Error")
}
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mock
import (
"context"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethersphere/bee/pkg/settlement/swap/chequebook"
)
// Service is the mock chequeStore service.
type Service struct {
receiveCheque func(ctx context.Context, cheque *chequebook.SignedCheque) (*big.Int, error)
lastCheque func(chequebook common.Address) (*chequebook.SignedCheque, error)
lastCheques func() (map[common.Address]*chequebook.SignedCheque, error)
}
func WithRetrieveChequeFunc(f func(ctx context.Context, cheque *chequebook.SignedCheque) (*big.Int, error)) Option {
return optionFunc(func(s *Service) {
s.receiveCheque = f
})
}
func WithLastChequeFunc(f func(chequebook common.Address) (*chequebook.SignedCheque, error)) Option {
return optionFunc(func(s *Service) {
s.lastCheque = f
})
}
func WithLastChequesFunc(f func() (map[common.Address]*chequebook.SignedCheque, error)) Option {
return optionFunc(func(s *Service) {
s.lastCheques = f
})
}
// NewChequeStore creates the mock chequeStore implementation
func NewChequeStore(opts ...Option) chequebook.ChequeStore {
mock := new(Service)
for _, o := range opts {
o.apply(mock)
}
return mock
}
func (s *Service) ReceiveCheque(ctx context.Context, cheque *chequebook.SignedCheque) (*big.Int, error) {
return s.receiveCheque(ctx, cheque)
}
func (s *Service) LastCheque(chequebook common.Address) (*chequebook.SignedCheque, error) {
return s.lastCheque(chequebook)
}
func (s *Service) LastCheques() (map[common.Address]*chequebook.SignedCheque, error) {
return s.lastCheques()
}
// Option is the option passed to the mock ChequeStore service
type Option interface {
apply(*Service)
}
type optionFunc func(*Service)
func (f optionFunc) apply(r *Service) { f(r) }
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package swap
import (
m "github.com/ethersphere/bee/pkg/metrics"
"github.com/prometheus/client_golang/prometheus"
)
type metrics struct {
// all metrics fields must be exported
// to be able to return them by Metrics()
// using reflection
TotalReceived prometheus.Counter
TotalSent prometheus.Counter
}
func newMetrics() metrics {
subsystem := "swap"
return metrics{
TotalReceived: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_received",
Help: "Amount of tokens received from peers (income of the node)",
}),
TotalSent: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_sent",
Help: "Amount of tokens sent to peers (costs paid by the node)",
})}
}
func (s *Service) Metrics() []prometheus.Collector {
return m.PrometheusCollectorsFromFields(s.metrics)
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package swap
import (
"context"
"errors"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/settlement"
"github.com/ethersphere/bee/pkg/settlement/swap/chequebook"
"github.com/ethersphere/bee/pkg/settlement/swap/swapprotocol"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
var (
// ErrWrongChequebook is the error if a peer uses a different chequebook from before.
ErrWrongChequebook = errors.New("wrong chequebook")
// ErrWrongBeneficiary is the error if a peer uses a different beneficiary than expected.
ErrWrongBeneficiary = errors.New("wrong beneficiary")
// ErrUnknownBeneficary is the error if a peer has never announced a beneficiary.
ErrUnknownBeneficary = errors.New("unknown beneficiary for peer")
)
// Service is the implementation of the swap settlement layer.
type Service struct {
proto swapprotocol.Interface
logger logging.Logger
store storage.StateStorer
observer settlement.PaymentObserver
metrics metrics
chequebook chequebook.Service
chequeStore chequebook.ChequeStore
addressbook Addressbook
networkID uint64
}
// New creates a new swap Service.
func New(proto swapprotocol.Interface, logger logging.Logger, store storage.StateStorer, chequebook chequebook.Service, chequeStore chequebook.ChequeStore, addressbook Addressbook, networkID uint64) *Service {
return &Service{
proto: proto,
logger: logger,
store: store,
metrics: newMetrics(),
chequebook: chequebook,
chequeStore: chequeStore,
addressbook: addressbook,
networkID: networkID,
}
}
// ReceiveCheque is called by the swap protocol if a cheque is received.
func (s *Service) ReceiveCheque(ctx context.Context, peer swarm.Address, cheque *chequebook.SignedCheque) (err error) {
// check this is the same chequebook for this peer as previously
expectedChequebook, known, err := s.addressbook.Chequebook(peer)
if err != nil {
return err
}
if known && expectedChequebook != cheque.Chequebook {
return ErrWrongChequebook
}
amount, err := s.chequeStore.ReceiveCheque(ctx, cheque)
if err != nil {
return err
}
if !known {
err = s.addressbook.PutChequebook(peer, cheque.Chequebook)
if err != nil {
return err
}
}
s.metrics.TotalReceived.Add(float64(amount.Uint64()))
return s.observer.NotifyPayment(peer, amount.Uint64())
}
// Pay initiates a payment to the given peer
func (s *Service) Pay(ctx context.Context, peer swarm.Address, amount uint64) error {
beneficiary, known, err := s.addressbook.Beneficiary(peer)
if err != nil {
return err
}
if !known {
return ErrUnknownBeneficary
}
err = s.chequebook.Issue(beneficiary, big.NewInt(int64(amount)), func(signedCheque *chequebook.SignedCheque) error {
return s.proto.EmitCheque(ctx, peer, signedCheque)
})
if err != nil {
return err
}
s.metrics.TotalSent.Add(float64(amount))
return nil
}
// SetPaymentObserver sets the payment observer which will be notified of incoming payments
func (s *Service) SetPaymentObserver(observer settlement.PaymentObserver) {
s.observer = observer
}
// TotalSent returns the total amount sent to a peer
func (s *Service) TotalSent(peer swarm.Address) (totalSent uint64, err error) {
beneficiary, known, err := s.addressbook.Beneficiary(peer)
if err != nil {
return 0, err
}
if !known {
return 0, settlement.ErrPeerNoSettlements
}
cheque, err := s.chequebook.LastCheque(beneficiary)
if err != nil {
if err == chequebook.ErrNoCheque {
return 0, settlement.ErrPeerNoSettlements
}
return 0, err
}
return cheque.CumulativePayout.Uint64(), nil
}
// TotalReceived returns the total amount received from a peer
func (s *Service) TotalReceived(peer swarm.Address) (totalReceived uint64, err error) {
chequebookAddress, known, err := s.addressbook.Chequebook(peer)
if err != nil {
return 0, err
}
if !known {
return 0, settlement.ErrPeerNoSettlements
}
cheque, err := s.chequeStore.LastCheque(chequebookAddress)
if err != nil {
if err == chequebook.ErrNoCheque {
return 0, settlement.ErrPeerNoSettlements
}
return 0, err
}
return cheque.CumulativePayout.Uint64(), nil
}
// SettlementsSent returns sent settlements for each individual known peer
func (s *Service) SettlementsSent() (map[string]uint64, error) {
result := make(map[string]uint64)
cheques, err := s.chequebook.LastCheques()
if err != nil {
return nil, err
}
for beneficiary, cheque := range cheques {
peer, known, err := s.addressbook.BeneficiaryPeer(beneficiary)
if err != nil {
return nil, err
}
if !known {
continue
}
result[peer.String()] = cheque.CumulativePayout.Uint64()
}
return result, nil
}
// SettlementsReceived returns received settlements for each individual known peer.
func (s *Service) SettlementsReceived() (map[string]uint64, error) {
result := make(map[string]uint64)
cheques, err := s.chequeStore.LastCheques()
if err != nil {
return nil, err
}
for chequebook, cheque := range cheques {
peer, known, err := s.addressbook.ChequebookPeer(chequebook)
if err != nil {
return nil, err
}
if !known {
continue
}
result[peer.String()] = cheque.CumulativePayout.Uint64()
}
return result, err
}
// Handshake is called by the swap protocol when a handshake is received.
func (s *Service) Handshake(peer swarm.Address, beneficiary common.Address) error {
// check that the overlay address was derived from the beneficiary (implying they have the same private key)
// while this is not strictly necessary for correct functionality we need to ensure no two peers use the same beneficiary
// as long as we enforce this we might not need the handshake message if the p2p layer exposed the overlay public key
expectedOverlay := crypto.NewOverlayFromEthereumAddress(beneficiary[:], s.networkID)
if !expectedOverlay.Equal(peer) {
return ErrWrongBeneficiary
}
storedBeneficiary, known, err := s.addressbook.Beneficiary(peer)
if err != nil {
return err
}
if !known {
s.logger.Tracef("initial swap handshake peer: %v beneficiary: %x", peer, beneficiary)
return s.addressbook.PutBeneficiary(peer, beneficiary)
}
if storedBeneficiary != beneficiary {
return ErrWrongBeneficiary
}
return nil
}
This diff is collapsed.
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:generate sh -c "protoc -I . -I \"$(go list -f '{{ .Dir }}' -m github.com/gogo/protobuf)/protobuf\" --gogofaster_out=. swap.proto"
package pb
This diff is collapsed.
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
syntax = "proto3";
package swapprotocol;
option go_package = "pb";
message EmitCheque {
bytes Cheque = 1;
}
message Handshake {
bytes Beneficiary = 1;
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package swapprotocol
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/ethersphere/bee/pkg/settlement/swap/chequebook"
"github.com/ethersphere/bee/pkg/settlement/swap/swapprotocol/pb"
"github.com/ethersphere/bee/pkg/swarm"
)
const (
protocolName = "swap"
protocolVersion = "1.0.0"
streamName = "swap" // stream for cheques
initStreamName = "init" // stream for handshake
)
// Interface is the main interface to send messages over swap protocol.
type Interface interface {
// EmitCheque sends a signed cheque to a peer.
EmitCheque(ctx context.Context, peer swarm.Address, cheque *chequebook.SignedCheque) error
}
// Swap is the interface the settlement layer should implement to receive cheques.
type Swap interface {
// ReceiveCheque is called by the swap protocol if a cheque is received.
ReceiveCheque(ctx context.Context, peer swarm.Address, cheque *chequebook.SignedCheque) error
// Handshake is called by the swap protocol when a handshake is received.
Handshake(peer swarm.Address, beneficiary common.Address) error
}
// Service is the main implementation of the swap protocol.
type Service struct {
streamer p2p.Streamer
logger logging.Logger
swap Swap
beneficiary common.Address
}
// New creates a new swap protocol Service.
func New(streamer p2p.Streamer, logger logging.Logger, beneficiary common.Address) *Service {
return &Service{
streamer: streamer,
logger: logger,
beneficiary: beneficiary,
}
}
// SetSwap sets the swap to notify.
func (s *Service) SetSwap(swap Swap) {
s.swap = swap
}
func (s *Service) Protocol() p2p.ProtocolSpec {
return p2p.ProtocolSpec{
Name: protocolName,
Version: protocolVersion,
StreamSpecs: []p2p.StreamSpec{
{
Name: streamName,
Handler: s.handler,
},
{
Name: initStreamName,
Handler: s.initHandler,
},
},
ConnectOut: s.init,
}
}
func (s *Service) initHandler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (err error) {
w, r := protobuf.NewWriterAndReader(stream)
defer func() {
if err != nil {
_ = stream.Reset()
} else {
_ = stream.FullClose()
}
}()
var req pb.Handshake
if err := r.ReadMsg(&req); err != nil {
return fmt.Errorf("read request from peer %v: %w", p.Address, err)
}
if len(req.Beneficiary) != 20 {
return errors.New("malformed beneficiary address")
}
err = w.WriteMsgWithContext(ctx, &pb.Handshake{
Beneficiary: s.beneficiary.Bytes(),
})
if err != nil {
return err
}
beneficiary := common.BytesToAddress(req.Beneficiary)
return s.swap.Handshake(p.Address, beneficiary)
}
// init is called on outgoing connections and triggers handshake exchange
func (s *Service) init(ctx context.Context, p p2p.Peer) error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
stream, err := s.streamer.NewStream(ctx, p.Address, nil, protocolName, protocolVersion, initStreamName)
if err != nil {
return err
}
defer func() {
if err != nil {
_ = stream.Reset()
} else {
_ = stream.FullClose() // wait for confirmation
}
}()
w, r := protobuf.NewWriterAndReader(stream)
err = w.WriteMsgWithContext(ctx, &pb.Handshake{
Beneficiary: s.beneficiary.Bytes(),
})
if err != nil {
return err
}
var req pb.Handshake
if err := r.ReadMsg(&req); err != nil {
return fmt.Errorf("read request from peer %v: %w", p.Address, err)
}
// any 20-byte byte-sequence is a valid eth address
if len(req.Beneficiary) != 20 {
return errors.New("malformed beneficiary address")
}
beneficiary := common.BytesToAddress(req.Beneficiary)
return s.swap.Handshake(p.Address, beneficiary)
}
func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (err error) {
r := protobuf.NewReader(stream)
defer func() {
if err != nil {
_ = stream.Reset()
} else {
_ = stream.FullClose()
}
}()
var req pb.EmitCheque
if err := r.ReadMsg(&req); err != nil {
return fmt.Errorf("read request from peer %v: %w", p.Address, err)
}
var signedCheque *chequebook.SignedCheque
err = json.Unmarshal(req.Cheque, &signedCheque)
if err != nil {
return err
}
return s.swap.ReceiveCheque(ctx, p.Address, signedCheque)
}
// EmitCheque sends a signed cheque to a peer.
func (s *Service) EmitCheque(ctx context.Context, peer swarm.Address, cheque *chequebook.SignedCheque) error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
stream, err := s.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName)
if err != nil {
return err
}
defer func() {
if err != nil {
_ = stream.Reset()
} else {
// wait for full close
// this ensure the accounting lock for this peer will be held long for the other peer to process the cheque
_ = stream.FullClose()
}
}()
// for simplicity we use json marshaller. can be replaced by a binary encoding in the future.
encodedCheque, err := json.Marshal(cheque)
if err != nil {
return err
}
s.logger.Tracef("sending cheque message to peer %v (%v)", peer, cheque)
w := protobuf.NewWriter(stream)
return w.WriteMsgWithContext(ctx, &pb.EmitCheque{
Cheque: encodedCheque,
})
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment