Commit c37b31db authored by Ralph Pichler's avatar Ralph Pichler Committed by GitHub

feat: async settlement (#1578)

parent 5bf7137f
......@@ -64,7 +64,7 @@ jobs:
run: |
echo -e "127.0.0.10\tregistry.localhost" | sudo tee -a /etc/hosts
for ((i=0; i<REPLICA; i++)); do echo -e "127.0.1.$((i+1))\tbee-${i}.localhost bee-${i}-debug.localhost"; done | sudo tee -a /etc/hosts
timeout 30m ./beeinfra.sh install --local -r "${REPLICA}" --bootnode /dnsaddr/localhost --geth --k3s --pay-threshold 1000000000000 --postage
timeout 30m ./beeinfra.sh install --local -r "${REPLICA}" --bootnode /dnsaddr/localhost --geth --k3s --pay-threshold 2000000000000 --postage
- name: Test pingpong
id: pingpong-1
run: until ./beekeeper check pingpong --api-scheme http --debug-api-scheme http --disable-namespace --debug-api-domain localhost --api-domain localhost --node-count "${REPLICA}"; do echo "waiting for pingpong..."; sleep .3; done
......@@ -73,7 +73,7 @@ jobs:
run: ./beekeeper check fullconnectivity --api-scheme http --debug-api-scheme http --disable-namespace --debug-api-domain localhost --api-domain localhost --node-count "${REPLICA}"
- name: Test settlements
id: settlements-1
run: ./beekeeper check settlements --api-scheme http --debug-api-scheme http --disable-namespace --debug-api-domain localhost --api-domain localhost --node-count "${REPLICA}" --upload-node-count "${REPLICA}" -t 1000000000000
run: ./beekeeper check settlements --api-scheme http --debug-api-scheme http --disable-namespace --debug-api-domain localhost --api-domain localhost --node-count "${REPLICA}" --upload-node-count "${REPLICA}" -t 2000000000000
- name: Test pushsync (chunks)
id: pushsync-chunks-1
run: ./beekeeper check pushsync --api-scheme http --debug-api-scheme http --disable-namespace --debug-api-domain localhost --api-domain localhost --node-count "${REPLICA}" --upload-node-count "${REPLICA}" --chunks-per-node 3 --upload-chunks --retry-delay 10s
......@@ -101,7 +101,7 @@ jobs:
cp /etc/rancher/k3s/k3s.yaml ~/.kube/config
- name: Set testing cluster (Node connection and clef enabled)
run: |
timeout 30m ./beeinfra.sh install --local -r "${REPLICA}" --geth --clef --k3s --pay-threshold 1000000000000 --postage
timeout 30m ./beeinfra.sh install --local -r "${REPLICA}" --geth --clef --k3s --pay-threshold 2000000000000 --postage
- name: Test pingpong
id: pingpong-2
run: until ./beekeeper check pingpong --api-scheme http --debug-api-scheme http --disable-namespace --debug-api-domain localhost --api-domain localhost --node-count "${REPLICA}"; do echo "waiting for pingpong..."; sleep .3; done
......@@ -110,7 +110,7 @@ jobs:
run: ./beekeeper check fullconnectivity --api-scheme http --debug-api-scheme http --disable-namespace --debug-api-domain localhost --api-domain localhost --node-count "${REPLICA}"
- name: Test settlements
id: settlements-2
run: ./beekeeper check settlements --api-scheme http --debug-api-scheme http --disable-namespace --debug-api-domain localhost --api-domain localhost --node-count "${REPLICA}" --upload-node-count "${REPLICA}" -t 1000000000000
run: ./beekeeper check settlements --api-scheme http --debug-api-scheme http --disable-namespace --debug-api-domain localhost --api-domain localhost --node-count "${REPLICA}" --upload-node-count "${REPLICA}" -t 2000000000000
- name: Destroy the cluster
run: |
./beeinfra.sh uninstall
......@@ -126,7 +126,7 @@ jobs:
cp /etc/rancher/k3s/k3s.yaml ~/.kube/config
- name: Set testing cluster (storage incentives setup)
run: |
timeout 10m ./beeinfra.sh install --local -r "${REPLICA}" --geth --k3s --pay-threshold 1000000000000 --postage --db-capacity 100
timeout 10m ./beeinfra.sh install --local -r "${REPLICA}" --geth --k3s --pay-threshold 2000000000000 --postage --db-capacity 100
- name: Test gc
id: gc-chunk-1
run: ./beekeeper check gc --cache-capacity 100 --api-scheme http --debug-api-scheme http --disable-namespace --debug-api-domain localhost --api-domain localhost --node-count "${REPLICA}"
......
......@@ -18,7 +18,6 @@ import (
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/pricing"
"github.com/ethersphere/bee/pkg/settlement"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
......@@ -55,11 +54,14 @@ type Interface interface {
CompensatedBalances() (map[string]*big.Int, error)
}
type PayFunc func(context.Context, swarm.Address, *big.Int)
// accountingPeer holds all in-memory accounting information for one peer.
type accountingPeer struct {
lock sync.Mutex // lock to be held during any accounting action for this peer
reservedBalance *big.Int // amount currently reserved for active peer interaction
paymentThreshold *big.Int // the threshold at which the peer expects us to pay
paymentOngoing bool // indicate if we are currently settling with the peer
}
// Accounting is the main implementation of the accounting interface.
......@@ -75,7 +77,7 @@ type Accounting struct {
// disconnect them.
paymentTolerance *big.Int
earlyPayment *big.Int
settlement settlement.Interface
payFunction PayFunc
pricing pricing.Interface
metrics metrics
}
......@@ -100,7 +102,6 @@ func NewAccounting(
EarlyPayment *big.Int,
Logger logging.Logger,
Store storage.StateStorer,
Settlement settlement.Interface,
Pricing pricing.Interface,
) (*Accounting, error) {
return &Accounting{
......@@ -110,7 +111,6 @@ func NewAccounting(
earlyPayment: new(big.Int).Set(EarlyPayment),
logger: Logger,
store: Store,
settlement: Settlement,
pricing: Pricing,
metrics: newMetrics(),
}, nil
......@@ -118,10 +118,7 @@ func NewAccounting(
// Reserve reserves a portion of the balance for peer and attempts settlements if necessary.
func (a *Accounting) Reserve(ctx context.Context, peer swarm.Address, price uint64) error {
accountingPeer, err := a.getAccountingPeer(peer)
if err != nil {
return err
}
accountingPeer := a.getAccountingPeer(peer)
accountingPeer.lock.Lock()
defer accountingPeer.lock.Unlock()
......@@ -167,14 +164,10 @@ func (a *Accounting) Reserve(ctx context.Context, peer swarm.Address, price uint
// and we are actually in debt, trigger settlement.
// we pay early to avoid needlessly blocking request later when concurrent requests occur and we are already close to the payment threshold.
if increasedExpectedDebt.Cmp(threshold) >= 0 && currentBalance.Cmp(big.NewInt(0)) < 0 {
err = a.settle(ctx, peer, accountingPeer)
err = a.settle(context.Background(), peer, accountingPeer)
if err != nil {
return fmt.Errorf("failed to settle with peer %v: %v", peer, err)
}
// if we settled successfully our balance is back at 0
// and the expected debt therefore equals next reserved amount
expectedDebt = nextReserved
increasedExpectedDebt = new(big.Int).Add(expectedDebt, additionalDebt)
}
// if expectedDebt would still exceed the paymentThreshold at this point block this request
......@@ -190,11 +183,7 @@ func (a *Accounting) Reserve(ctx context.Context, peer swarm.Address, price uint
// Release releases reserved funds.
func (a *Accounting) Release(peer swarm.Address, price uint64) {
accountingPeer, err := a.getAccountingPeer(peer)
if err != nil {
a.logger.Errorf("cannot release balance for peer: %v", err)
return
}
accountingPeer := a.getAccountingPeer(peer)
accountingPeer.lock.Lock()
defer accountingPeer.lock.Unlock()
......@@ -213,10 +202,7 @@ func (a *Accounting) Release(peer swarm.Address, price uint64) {
// Credit increases the amount of credit we have with the given peer
// (and decreases existing debt).
func (a *Accounting) Credit(peer swarm.Address, price uint64) error {
accountingPeer, err := a.getAccountingPeer(peer)
if err != nil {
return err
}
accountingPeer := a.getAccountingPeer(peer)
accountingPeer.lock.Lock()
defer accountingPeer.lock.Unlock()
......@@ -246,6 +232,10 @@ func (a *Accounting) Credit(peer swarm.Address, price uint64) error {
// Settle all debt with a peer. The lock on the accountingPeer must be held when
// called.
func (a *Accounting) settle(ctx context.Context, peer swarm.Address, balance *accountingPeer) error {
if balance.paymentOngoing {
return nil
}
oldBalance, err := a.Balance(peer)
if err != nil {
if !errors.Is(err, ErrPeerNoBalance) {
......@@ -263,24 +253,9 @@ func (a *Accounting) settle(ctx context.Context, peer swarm.Address, balance *ac
// This is safe because of the earlier check for oldbalance < 0 and the check for != MinInt64
paymentAmount := new(big.Int).Neg(oldBalance)
// Try to save the next balance first.
// Otherwise we might pay and then not be able to save, forcing us to pay
// again after restart.
err = a.store.Put(peerBalanceKey(peer), big.NewInt(0))
if err != nil {
return fmt.Errorf("failed to persist balance: %w", err)
}
balance.paymentOngoing = true
err = a.settlement.Pay(ctx, peer, paymentAmount)
if err != nil {
err = fmt.Errorf("settlement for amount %d failed: %w", paymentAmount, err)
// If the payment didn't succeed we should restore the old balance in
// the state store.
if storeErr := a.store.Put(peerBalanceKey(peer), oldBalance); storeErr != nil {
a.logger.Errorf("failed to restore balance after failed settlement for peer %v: %v", peer, storeErr)
}
return err
}
go a.payFunction(ctx, peer, paymentAmount)
return nil
}
......@@ -288,16 +263,14 @@ func (a *Accounting) settle(ctx context.Context, peer swarm.Address, balance *ac
// Debit increases the amount of debt we have with the given peer (and decreases
// existing credit).
func (a *Accounting) Debit(peer swarm.Address, price uint64) error {
accountingPeer, err := a.getAccountingPeer(peer)
if err != nil {
return err
}
accountingPeer := a.getAccountingPeer(peer)
accountingPeer.lock.Lock()
defer accountingPeer.lock.Unlock()
cost := new(big.Int).SetUint64(price)
// see if peer has surplus balance to deduct this transaction of
surplusBalance, err := a.SurplusBalance(peer)
if err != nil {
return fmt.Errorf("failed to get surplus balance: %w", err)
......@@ -439,7 +412,7 @@ func peerSurplusBalanceKey(peer swarm.Address) string {
// getAccountingPeer returns the accountingPeer for a given swarm address.
// If not found in memory it will initialize it.
func (a *Accounting) getAccountingPeer(peer swarm.Address) (*accountingPeer, error) {
func (a *Accounting) getAccountingPeer(peer swarm.Address) *accountingPeer {
a.accountingPeersMu.Lock()
defer a.accountingPeersMu.Unlock()
......@@ -453,7 +426,7 @@ func (a *Accounting) getAccountingPeer(peer swarm.Address) (*accountingPeer, err
a.accountingPeers[peer.String()] = peerData
}
return peerData, nil
return peerData
}
// Balances gets balances for all peers from store.
......@@ -569,11 +542,8 @@ func surplusBalanceKeyPeer(key []byte) (swarm.Address, error) {
}
// NotifyPayment is called by Settlement when we receive a payment.
func (a *Accounting) NotifyPayment(peer swarm.Address, amount *big.Int) error {
accountingPeer, err := a.getAccountingPeer(peer)
if err != nil {
return err
}
func (a *Accounting) NotifyPaymentReceived(peer swarm.Address, amount *big.Int) error {
accountingPeer := a.getAccountingPeer(peer)
accountingPeer.lock.Lock()
defer accountingPeer.lock.Unlock()
......@@ -644,28 +614,67 @@ func (a *Accounting) NotifyPayment(peer swarm.Address, amount *big.Int) error {
return nil
}
// AsyncNotifyPayment calls notify payment in a go routine.
// This is needed when accounting needs to be notified but the accounting lock is already held.
func (a *Accounting) AsyncNotifyPayment(peer swarm.Address, amount *big.Int) error {
go func() {
err := a.NotifyPayment(peer, amount)
if err != nil {
a.logger.Errorf("failed to notify accounting of payment: %v", err)
}
}()
// NotifyPaymentThreshold should be called to notify accounting of changes in the payment threshold
func (a *Accounting) NotifyPaymentThreshold(peer swarm.Address, paymentThreshold *big.Int) error {
accountingPeer := a.getAccountingPeer(peer)
accountingPeer.lock.Lock()
defer accountingPeer.lock.Unlock()
accountingPeer.paymentThreshold.Set(paymentThreshold)
return nil
}
// NotifyPaymentThreshold should be called to notify accounting of changes in the payment threshold
func (a *Accounting) NotifyPaymentThreshold(peer swarm.Address, paymentThreshold *big.Int) error {
accountingPeer, err := a.getAccountingPeer(peer)
func (a *Accounting) PeerDebt(peer swarm.Address) (*big.Int, error) {
zero := big.NewInt(0)
balance, err := a.Balance(peer)
if err != nil {
return err
if errors.Is(err, ErrPeerNoBalance) {
return zero, nil
}
return nil, err
}
if balance.Cmp(zero) <= 0 {
return zero, nil
}
return balance, nil
}
func (a *Accounting) NotifyPaymentSent(peer swarm.Address, amount *big.Int, receivedError error) {
accountingPeer := a.getAccountingPeer(peer)
accountingPeer.lock.Lock()
defer accountingPeer.lock.Unlock()
accountingPeer.paymentThreshold.Set(paymentThreshold)
return nil
accountingPeer.paymentOngoing = false
if receivedError != nil {
a.logger.Warningf("accouting: payment failure %v", receivedError)
return
}
currentBalance, err := a.Balance(peer)
if err != nil {
if !errors.Is(err, ErrPeerNoBalance) {
a.logger.Warningf("accounting: notifypaymentsent failed to load balance: %v", err)
return
}
}
// Get nextBalance by safely increasing current balance with price
nextBalance := new(big.Int).Add(currentBalance, amount)
a.logger.Tracef("registering payment sent to peer %v with amount %d, new balance is %d", peer, amount, nextBalance)
err = a.store.Put(peerBalanceKey(peer), nextBalance)
if err != nil {
a.logger.Warningf("accounting: notifypaymentsent failed to persist balance: %v", err)
return
}
}
func (a *Accounting) SetPayFunc(f PayFunc) {
a.payFunction = f
}
This diff is collapsed.
......@@ -16,6 +16,7 @@ import (
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p/libp2p"
"github.com/ethersphere/bee/pkg/settlement"
"github.com/ethersphere/bee/pkg/settlement/swap"
"github.com/ethersphere/bee/pkg/settlement/swap/chequebook"
"github.com/ethersphere/bee/pkg/settlement/swap/swapprotocol"
......@@ -185,6 +186,7 @@ func InitSwap(
chequebookService chequebook.Service,
chequeStore chequebook.ChequeStore,
cashoutService chequebook.CashoutService,
accountingAPI settlement.AccountingAPI,
) (*swap.Service, error) {
swapProtocol := swapprotocol.New(p2ps, logger, overlayEthAddress)
swapAddressBook := swap.NewAddressbook(stateStore)
......@@ -199,6 +201,7 @@ func InitSwap(
networkID,
cashoutService,
p2ps,
accountingAPI,
)
swapProtocol.SetSwap(swapService)
......
......@@ -461,6 +461,26 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
logger.Debugf("p2p address: %s", addr)
}
paymentTolerance, ok := new(big.Int).SetString(o.PaymentTolerance, 10)
if !ok {
return nil, fmt.Errorf("invalid payment tolerance: %s", paymentTolerance)
}
paymentEarly, ok := new(big.Int).SetString(o.PaymentEarly, 10)
if !ok {
return nil, fmt.Errorf("invalid payment early: %s", paymentEarly)
}
acc, err := accounting.NewAccounting(
paymentThreshold,
paymentTolerance,
paymentEarly,
logger,
stateStore,
pricing,
)
if err != nil {
return nil, fmt.Errorf("accounting: %w", err)
}
if o.SwapEnable {
swapService, err = InitSwap(
p2ps,
......@@ -471,43 +491,23 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
chequebookService,
chequeStore,
cashoutService,
acc,
)
if err != nil {
return nil, err
}
settlement = swapService
} else {
pseudosettleService := pseudosettle.New(p2ps, logger, stateStore)
pseudosettleService := pseudosettle.New(p2ps, logger, stateStore, acc)
if err = p2ps.AddProtocol(pseudosettleService.Protocol()); err != nil {
return nil, fmt.Errorf("pseudosettle service: %w", err)
}
settlement = pseudosettleService
}
paymentTolerance, ok := new(big.Int).SetString(o.PaymentTolerance, 10)
if !ok {
return nil, fmt.Errorf("invalid payment tolerance: %s", paymentTolerance)
}
paymentEarly, ok := new(big.Int).SetString(o.PaymentEarly, 10)
if !ok {
return nil, fmt.Errorf("invalid payment early: %s", paymentEarly)
}
acc, err := accounting.NewAccounting(
paymentThreshold,
paymentTolerance,
paymentEarly,
logger,
stateStore,
settlement,
pricing,
)
if err != nil {
return nil, fmt.Errorf("accounting: %w", err)
}
acc.SetPayFunc(settlement.Pay)
pricing.SetPaymentThresholdObserver(acc)
settlement.SetNotifyPaymentFunc(acc.AsyncNotifyPayment)
retrieve := retrieval.New(swarmAddress, storer, p2ps, kad, logger, acc, pricer, tracer)
tagService := tags.NewTags(stateStore, logger)
......
......@@ -20,7 +20,7 @@ var (
type Interface interface {
// Pay initiates a payment to the given peer
// It should return without error it is likely that the payment worked
Pay(ctx context.Context, peer swarm.Address, amount *big.Int) error
Pay(ctx context.Context, peer swarm.Address, amount *big.Int)
// TotalSent returns the total amount sent to a peer
TotalSent(peer swarm.Address) (totalSent *big.Int, err error)
// TotalReceived returns the total amount received from a peer
......@@ -29,9 +29,10 @@ type Interface interface {
SettlementsSent() (map[string]*big.Int, error)
// SettlementsReceived returns received settlements for each individual known peer
SettlementsReceived() (map[string]*big.Int, error)
// SetNotifyPaymentFunc sets the NotifyPaymentFunc to notify
SetNotifyPaymentFunc(notifyPaymentFunc NotifyPaymentFunc)
}
// NotifyPaymentFunc is called when a payment from peer was successfully received
type NotifyPaymentFunc func(peer swarm.Address, amount *big.Int) error
type AccountingAPI interface {
PeerDebt(peer swarm.Address) (*big.Int, error)
NotifyPaymentReceived(peer swarm.Address, amount *big.Int) error
NotifyPaymentSent(peer swarm.Address, amount *big.Int, receivedError error)
}
......@@ -33,19 +33,20 @@ var (
)
type Service struct {
streamer p2p.Streamer
logger logging.Logger
store storage.StateStorer
notifyPaymentFunc settlement.NotifyPaymentFunc
metrics metrics
streamer p2p.Streamer
logger logging.Logger
store storage.StateStorer
accountingAPI settlement.AccountingAPI
metrics metrics
}
func New(streamer p2p.Streamer, logger logging.Logger, store storage.StateStorer) *Service {
func New(streamer p2p.Streamer, logger logging.Logger, store storage.StateStorer, accountingAPI settlement.AccountingAPI) *Service {
return &Service{
streamer: streamer,
logger: logger,
metrics: newMetrics(),
store: store,
streamer: streamer,
logger: logger,
metrics: newMetrics(),
store: store,
accountingAPI: accountingAPI,
}
}
......@@ -106,17 +107,22 @@ func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (e
return err
}
return s.notifyPaymentFunc(p.Address, new(big.Int).SetUint64(req.Amount))
return s.accountingAPI.NotifyPaymentReceived(p.Address, new(big.Int).SetUint64(req.Amount))
}
// Pay initiates a payment to the given peer
func (s *Service) Pay(ctx context.Context, peer swarm.Address, amount *big.Int) error {
func (s *Service) Pay(ctx context.Context, peer swarm.Address, amount *big.Int) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
var err error
defer func() {
if err != nil {
s.accountingAPI.NotifyPaymentSent(peer, nil, err)
}
}()
stream, err := s.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName)
if err != nil {
return err
return
}
defer func() {
if err != nil {
......@@ -132,28 +138,27 @@ func (s *Service) Pay(ctx context.Context, peer swarm.Address, amount *big.Int)
Amount: amount.Uint64(),
})
if err != nil {
return err
return
}
totalSent, err := s.TotalSent(peer)
if err != nil {
if !errors.Is(err, settlement.ErrPeerNoSettlements) {
return err
return
}
totalSent = big.NewInt(0)
}
err = s.store.Put(totalKey(peer, SettlementSentPrefix), totalSent.Add(totalSent, amount))
if err != nil {
return err
return
}
s.accountingAPI.NotifyPaymentSent(peer, amount, nil)
amountFloat, _ := new(big.Float).SetInt(amount).Float64()
s.metrics.TotalSentPseudoSettlements.Add(amountFloat)
return nil
}
// SetNotifyPaymentFunc sets the NotifyPaymentFunc to notify
func (s *Service) SetNotifyPaymentFunc(notifyPaymentFunc settlement.NotifyPaymentFunc) {
s.notifyPaymentFunc = notifyPaymentFunc
func (s *Service) SetAccountingAPI(accountingAPI settlement.AccountingAPI) {
s.accountingAPI = accountingAPI
}
// TotalSent returns the total amount sent to a peer
......
......@@ -22,24 +22,47 @@ import (
)
type testObserver struct {
called chan struct{}
receivedCalled chan notifyPaymentReceivedCall
sentCalled chan notifyPaymentSentCall
}
type notifyPaymentReceivedCall struct {
peer swarm.Address
amount *big.Int
}
type notifyPaymentSentCall struct {
peer swarm.Address
amount *big.Int
err error
}
func newTestObserver() *testObserver {
return &testObserver{
called: make(chan struct{}),
receivedCalled: make(chan notifyPaymentReceivedCall, 1),
sentCalled: make(chan notifyPaymentSentCall, 1),
}
}
func (t *testObserver) NotifyPayment(peer swarm.Address, amount *big.Int) error {
close(t.called)
t.peer = peer
t.amount = amount
func (t *testObserver) PeerDebt(peer swarm.Address) (*big.Int, error) {
return nil, nil
}
func (t *testObserver) NotifyPaymentReceived(peer swarm.Address, amount *big.Int) error {
t.receivedCalled <- notifyPaymentReceivedCall{
peer: peer,
amount: amount,
}
return nil
}
func (t *testObserver) NotifyPaymentSent(peer swarm.Address, amount *big.Int, err error) {
t.sentCalled <- notifyPaymentSentCall{
peer: peer,
amount: amount,
err: err,
}
}
func TestPayment(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
......@@ -47,8 +70,7 @@ func TestPayment(t *testing.T) {
defer storeRecipient.Close()
observer := newTestObserver()
recipient := pseudosettle.New(nil, logger, storeRecipient)
recipient.SetNotifyPaymentFunc(observer.NotifyPayment)
recipient := pseudosettle.New(nil, logger, storeRecipient, observer)
peerID := swarm.MustParseHexAddress("9ee7add7")
......@@ -60,14 +82,13 @@ func TestPayment(t *testing.T) {
storePayer := mock.NewStateStore()
defer storePayer.Close()
payer := pseudosettle.New(recorder, logger, storePayer)
observer2 := newTestObserver()
payer := pseudosettle.New(recorder, logger, storePayer, observer2)
payer.SetAccountingAPI(observer2)
amount := big.NewInt(10000)
err := payer.Pay(context.Background(), peerID, amount)
if err != nil {
t.Fatal(err)
}
payer.Pay(context.Background(), peerID, amount)
records, err := recorder.Records(peerID, "pseudosettle", "1.0.0", "pseudosettle")
if err != nil {
......@@ -102,17 +123,34 @@ func TestPayment(t *testing.T) {
}
select {
case <-observer.called:
case call := <-observer.receivedCalled:
if call.amount.Cmp(amount) != 0 {
t.Fatalf("observer called with wrong amount. got %d, want %d", call.amount, amount)
}
if !call.peer.Equal(peerID) {
t.Fatalf("observer called with wrong peer. got %v, want %v", call.peer, peerID)
}
case <-time.After(time.Second):
t.Fatal("expected observer to be called")
}
if observer.amount.Cmp(amount) != 0 {
t.Fatalf("observer called with wrong amount. got %d, want %d", observer.amount, amount)
}
select {
case call := <-observer2.sentCalled:
if call.amount.Cmp(amount) != 0 {
t.Fatalf("observer called with wrong amount. got %d, want %d", call.amount, amount)
}
if !call.peer.Equal(peerID) {
t.Fatalf("observer called with wrong peer. got %v, want %v", call.peer, peerID)
}
if call.err != nil {
t.Fatalf("observer called with error. got %v want nil", call.err)
}
if !observer.peer.Equal(peerID) {
t.Fatalf("observer called with wrong peer. got %v, want %v", observer.peer, peerID)
case <-time.After(time.Second):
t.Fatal("expected observer to be called")
}
totalSent, err := payer.TotalSent(peerID)
......
......@@ -26,12 +26,11 @@ type Service struct {
settlementsSentFunc func() (map[string]*big.Int, error)
settlementsRecvFunc func() (map[string]*big.Int, error)
receiveChequeFunc func(context.Context, swarm.Address, *chequebook.SignedCheque) error
payFunc func(context.Context, swarm.Address, *big.Int) error
setNotifyPaymentFunc settlement.NotifyPaymentFunc
handshakeFunc func(swarm.Address, common.Address) error
lastSentChequeFunc func(swarm.Address) (*chequebook.SignedCheque, error)
lastSentChequesFunc func() (map[string]*chequebook.SignedCheque, error)
receiveChequeFunc func(context.Context, swarm.Address, *chequebook.SignedCheque) error
payFunc func(context.Context, swarm.Address, *big.Int)
handshakeFunc func(swarm.Address, common.Address) error
lastSentChequeFunc func(swarm.Address) (*chequebook.SignedCheque, error)
lastSentChequesFunc func() (map[string]*chequebook.SignedCheque, error)
lastReceivedChequeFunc func(swarm.Address) (*chequebook.SignedCheque, error)
lastReceivedChequesFunc func() (map[string]*chequebook.SignedCheque, error)
......@@ -72,19 +71,12 @@ func WithReceiveChequeFunc(f func(context.Context, swarm.Address, *chequebook.Si
})
}
func WithPayFunc(f func(context.Context, swarm.Address, *big.Int) error) Option {
func WithPayFunc(f func(context.Context, swarm.Address, *big.Int)) Option {
return optionFunc(func(s *Service) {
s.payFunc = f
})
}
// WithsettlementsFunc sets the mock settlements function
func WithSetNotifyPaymentFunc(f settlement.NotifyPaymentFunc) Option {
return optionFunc(func(s *Service) {
s.setNotifyPaymentFunc = f
})
}
func WithHandshakeFunc(f func(swarm.Address, common.Address) error) Option {
return optionFunc(func(s *Service) {
s.handshakeFunc = f
......@@ -155,22 +147,16 @@ func (s *Service) ReceiveCheque(ctx context.Context, peer swarm.Address, cheque
}
// Pay is the mock Pay function of swap.
func (s *Service) Pay(ctx context.Context, peer swarm.Address, amount *big.Int) error {
func (s *Service) Pay(ctx context.Context, peer swarm.Address, amount *big.Int) {
if s.payFunc != nil {
return s.payFunc(ctx, peer, amount)
s.payFunc(ctx, peer, amount)
return
}
if settlement, ok := s.settlementsSent[peer.String()]; ok {
s.settlementsSent[peer.String()] = big.NewInt(0).Add(settlement, amount)
} else {
s.settlementsSent[peer.String()] = amount
}
return nil
}
func (s *Service) SetNotifyPaymentFunc(f settlement.NotifyPaymentFunc) {
if s.setNotifyPaymentFunc != nil {
s.SetNotifyPaymentFunc(f)
}
}
// TotalSent is the mock TotalSent function of swap.
......
......@@ -47,32 +47,33 @@ type ApiInterface interface {
// Service is the implementation of the swap settlement layer.
type Service struct {
proto swapprotocol.Interface
logger logging.Logger
store storage.StateStorer
notifyPaymentFunc settlement.NotifyPaymentFunc
metrics metrics
chequebook chequebook.Service
chequeStore chequebook.ChequeStore
cashout chequebook.CashoutService
p2pService p2p.Service
addressbook Addressbook
networkID uint64
proto swapprotocol.Interface
logger logging.Logger
store storage.StateStorer
accountingAPI settlement.AccountingAPI
metrics metrics
chequebook chequebook.Service
chequeStore chequebook.ChequeStore
cashout chequebook.CashoutService
p2pService p2p.Service
addressbook Addressbook
networkID uint64
}
// New creates a new swap Service.
func New(proto swapprotocol.Interface, logger logging.Logger, store storage.StateStorer, chequebook chequebook.Service, chequeStore chequebook.ChequeStore, addressbook Addressbook, networkID uint64, cashout chequebook.CashoutService, p2pService p2p.Service) *Service {
func New(proto swapprotocol.Interface, logger logging.Logger, store storage.StateStorer, chequebook chequebook.Service, chequeStore chequebook.ChequeStore, addressbook Addressbook, networkID uint64, cashout chequebook.CashoutService, p2pService p2p.Service, accountingAPI settlement.AccountingAPI) *Service {
return &Service{
proto: proto,
logger: logger,
store: store,
metrics: newMetrics(),
chequebook: chequebook,
chequeStore: chequeStore,
addressbook: addressbook,
networkID: networkID,
cashout: cashout,
p2pService: p2pService,
proto: proto,
logger: logger,
store: store,
metrics: newMetrics(),
chequebook: chequebook,
chequeStore: chequeStore,
addressbook: addressbook,
networkID: networkID,
cashout: cashout,
p2pService: p2pService,
accountingAPI: accountingAPI,
}
}
......@@ -103,40 +104,46 @@ func (s *Service) ReceiveCheque(ctx context.Context, peer swarm.Address, cheque
s.metrics.TotalReceived.Add(float64(amount.Uint64()))
s.metrics.ChequesReceived.Inc()
return s.notifyPaymentFunc(peer, amount)
return s.accountingAPI.NotifyPaymentReceived(peer, amount)
}
// Pay initiates a payment to the given peer
func (s *Service) Pay(ctx context.Context, peer swarm.Address, amount *big.Int) error {
func (s *Service) Pay(ctx context.Context, peer swarm.Address, amount *big.Int) {
var err error
defer func() {
if err != nil {
s.accountingAPI.NotifyPaymentSent(peer, nil, err)
}
}()
beneficiary, known, err := s.addressbook.Beneficiary(peer)
if err != nil {
return err
return
}
if !known {
s.logger.Warningf("disconnecting non-swap peer %v", peer)
err = s.p2pService.Disconnect(peer)
if err != nil {
return err
return
}
return ErrUnknownBeneficary
err = ErrUnknownBeneficary
return
}
balance, err := s.chequebook.Issue(ctx, beneficiary, amount, func(signedCheque *chequebook.SignedCheque) error {
return s.proto.EmitCheque(ctx, peer, signedCheque)
})
if err != nil {
return err
return
}
bal, _ := big.NewFloat(0).SetInt(balance).Float64()
s.metrics.AvailableBalance.Set(bal)
s.accountingAPI.NotifyPaymentSent(peer, amount, nil)
amountFloat, _ := big.NewFloat(0).SetInt(amount).Float64()
s.metrics.TotalSent.Add(amountFloat)
s.metrics.ChequesSent.Inc()
return nil
}
// SetNotifyPaymentFunc sets the NotifyPaymentFunc to notify
func (s *Service) SetNotifyPaymentFunc(notifyPaymentFunc settlement.NotifyPaymentFunc) {
s.notifyPaymentFunc = notifyPaymentFunc
func (s *Service) SetAccountingAPI(accountingAPI settlement.AccountingAPI) {
s.accountingAPI = accountingAPI
}
// TotalSent returns the total amount sent to a peer
......
......@@ -10,6 +10,7 @@ import (
"io/ioutil"
"math/big"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethersphere/bee/pkg/crypto"
......@@ -35,18 +36,48 @@ func (m *swapProtocolMock) EmitCheque(ctx context.Context, peer swarm.Address, c
}
type testObserver struct {
called bool
receivedCalled chan notifyPaymentReceivedCall
sentCalled chan notifyPaymentSentCall
}
type notifyPaymentReceivedCall struct {
peer swarm.Address
amount *big.Int
}
type notifyPaymentSentCall struct {
peer swarm.Address
amount *big.Int
err error
}
func newTestObserver() *testObserver {
return &testObserver{
receivedCalled: make(chan notifyPaymentReceivedCall, 1),
sentCalled: make(chan notifyPaymentSentCall, 1),
}
}
func (t *testObserver) PeerDebt(peer swarm.Address) (*big.Int, error) {
return nil, nil
}
func (t *testObserver) NotifyPayment(peer swarm.Address, amount *big.Int) error {
t.called = true
t.peer = peer
t.amount = amount
func (t *testObserver) NotifyPaymentReceived(peer swarm.Address, amount *big.Int) error {
t.receivedCalled <- notifyPaymentReceivedCall{
peer: peer,
amount: amount,
}
return nil
}
func (t *testObserver) NotifyPaymentSent(peer swarm.Address, amount *big.Int, err error) {
t.sentCalled <- notifyPaymentSentCall{
peer: peer,
amount: amount,
err: err,
}
}
type addressbookMock struct {
beneficiary func(peer swarm.Address) (beneficiary common.Address, known bool, err error)
chequebook func(peer swarm.Address) (chequebookAddress common.Address, known bool, err error)
......@@ -131,6 +162,8 @@ func TestReceiveCheque(t *testing.T) {
},
}
observer := newTestObserver()
swap := swap.New(
&swapProtocolMock{},
logger,
......@@ -141,27 +174,28 @@ func TestReceiveCheque(t *testing.T) {
networkID,
&cashoutMock{},
mockp2p.New(),
observer,
)
observer := &testObserver{}
swap.SetNotifyPaymentFunc(observer.NotifyPayment)
err := swap.ReceiveCheque(context.Background(), peer, cheque)
if err != nil {
t.Fatal(err)
}
if !observer.called {
t.Fatal("expected observer to be called")
}
select {
case call := <-observer.receivedCalled:
if call.amount.Cmp(amount) != 0 {
t.Fatalf("observer called with wrong amount. got %d, want %d", call.amount, amount)
}
if observer.amount.Cmp(amount) != 0 {
t.Fatalf("observer called with wrong amount. got %d, want %d", observer.amount, amount)
}
if !call.peer.Equal(peer) {
t.Fatalf("observer called with wrong peer. got %v, want %v", call.peer, peer)
}
if !observer.peer.Equal(peer) {
t.Fatalf("observer called with wrong peer. got %v, want %v", observer.peer, peer)
case <-time.After(time.Second):
t.Fatal("expected observer to be called")
}
}
func TestReceiveChequeReject(t *testing.T) {
......@@ -194,6 +228,8 @@ func TestReceiveChequeReject(t *testing.T) {
},
}
observer := newTestObserver()
swap := swap.New(
&swapProtocolMock{},
logger,
......@@ -204,11 +240,9 @@ func TestReceiveChequeReject(t *testing.T) {
networkID,
&cashoutMock{},
mockp2p.New(),
observer,
)
observer := &testObserver{}
swap.SetNotifyPaymentFunc(observer.NotifyPayment)
err := swap.ReceiveCheque(context.Background(), peer, cheque)
if err == nil {
t.Fatal("accepted invalid cheque")
......@@ -217,9 +251,12 @@ func TestReceiveChequeReject(t *testing.T) {
t.Fatalf("wrong error. wanted %v, got %v", errReject, err)
}
if observer.called {
t.Fatal("observer was be called for rejected payment")
select {
case <-observer.receivedCalled:
t.Fatalf("observer called by error.")
default:
}
}
func TestReceiveChequeWrongChequebook(t *testing.T) {
......@@ -246,6 +283,7 @@ func TestReceiveChequeWrongChequebook(t *testing.T) {
},
}
observer := newTestObserver()
swapService := swap.New(
&swapProtocolMock{},
logger,
......@@ -256,11 +294,9 @@ func TestReceiveChequeWrongChequebook(t *testing.T) {
networkID,
&cashoutMock{},
mockp2p.New(),
observer,
)
observer := &testObserver{}
swapService.SetNotifyPaymentFunc(observer.NotifyPayment)
err := swapService.ReceiveCheque(context.Background(), peer, cheque)
if err == nil {
t.Fatal("accepted invalid cheque")
......@@ -269,9 +305,12 @@ func TestReceiveChequeWrongChequebook(t *testing.T) {
t.Fatalf("wrong error. wanted %v, got %v", swap.ErrWrongChequebook, err)
}
if observer.called {
t.Fatal("observer was be called for rejected payment")
select {
case <-observer.receivedCalled:
t.Fatalf("observer called by error.")
default:
}
}
func TestPay(t *testing.T) {
......@@ -307,6 +346,8 @@ func TestPay(t *testing.T) {
},
}
observer := newTestObserver()
var emitCalled bool
swap := swap.New(
&swapProtocolMock{
......@@ -329,12 +370,10 @@ func TestPay(t *testing.T) {
networkID,
&cashoutMock{},
mockp2p.New(),
observer,
)
err := swap.Pay(context.Background(), peer, amount)
if err != nil {
t.Fatal(err)
}
swap.Pay(context.Background(), peer, amount)
if !chequebookCalled {
t.Fatal("chequebook was not called")
......@@ -380,12 +419,27 @@ func TestPayIssueError(t *testing.T) {
networkID,
&cashoutMock{},
mockp2p.New(),
nil,
)
err := swap.Pay(context.Background(), peer, amount)
if !errors.Is(err, errReject) {
t.Fatalf("wrong error. wanted %v, got %v", errReject, err)
observer := newTestObserver()
swap.SetAccountingAPI(observer)
swap.Pay(context.Background(), peer, amount)
select {
case call := <-observer.sentCalled:
if !call.peer.Equal(peer) {
t.Fatalf("observer called with wrong peer. got %v, want %v", call.peer, peer)
}
if !errors.Is(call.err, errReject) {
t.Fatalf("wrong error. wanted %v, got %v", errReject, call.err)
}
case <-time.After(time.Second):
t.Fatal("expected observer to be called")
}
}
func TestPayUnknownBeneficiary(t *testing.T) {
......@@ -404,6 +458,8 @@ func TestPayUnknownBeneficiary(t *testing.T) {
},
}
observer := newTestObserver()
var disconnectCalled bool
swapService := swap.New(
&swapProtocolMock{},
......@@ -423,11 +479,23 @@ func TestPayUnknownBeneficiary(t *testing.T) {
return nil
}),
),
observer,
)
err := swapService.Pay(context.Background(), peer, amount)
if !errors.Is(err, swap.ErrUnknownBeneficary) {
t.Fatalf("wrong error. wanted %v, got %v", swap.ErrUnknownBeneficary, err)
swapService.Pay(context.Background(), peer, amount)
select {
case call := <-observer.sentCalled:
if !call.peer.Equal(peer) {
t.Fatalf("observer called with wrong peer. got %v, want %v", call.peer, peer)
}
if !errors.Is(call.err, swap.ErrUnknownBeneficary) {
t.Fatalf("wrong error. wanted %v, got %v", swap.ErrUnknownBeneficary, call.err)
}
case <-time.After(time.Second):
t.Fatal("expected observer to be called")
}
if !disconnectCalled {
......@@ -462,6 +530,7 @@ func TestHandshake(t *testing.T) {
networkID,
&cashoutMock{},
mockp2p.New(),
nil,
)
err := swapService.Handshake(peer, beneficiary)
......@@ -501,6 +570,7 @@ func TestHandshakeNewPeer(t *testing.T) {
networkID,
&cashoutMock{},
mockp2p.New(),
nil,
)
err := swapService.Handshake(peer, beneficiary)
......@@ -531,6 +601,7 @@ func TestHandshakeWrongBeneficiary(t *testing.T) {
networkID,
&cashoutMock{},
mockp2p.New(),
nil,
)
err := swapService.Handshake(peer, beneficiary)
......@@ -580,6 +651,7 @@ func TestCashout(t *testing.T) {
},
},
mockp2p.New(),
nil,
)
returnedHash, err := swapService.CashCheque(context.Background(), peer)
......@@ -626,6 +698,7 @@ func TestCashoutStatus(t *testing.T) {
},
},
mockp2p.New(),
nil,
)
returnedStatus, err := swapService.CashoutStatus(context.Background(), peer)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment