Commit 9e237f07 authored by metacertain's avatar metacertain Committed by GitHub

Surplus balances for peers (#870)

* Oversettle in debit and payment receive
* Surplus balances for peers, debug API addons (#972)
Additional helper endpoints for integration tests and overdraft safety for surpluses, Openapi update, Fixes
parent 27e96b0e
...@@ -42,7 +42,7 @@ paths: ...@@ -42,7 +42,7 @@ paths:
'/balances': '/balances':
get: get:
summary: Get the balances with all known peers summary: Get the balances with all known peers including prepaid services
tags: tags:
- Balance - Balance
responses: responses:
...@@ -59,7 +59,7 @@ paths: ...@@ -59,7 +59,7 @@ paths:
'/balances/{address}': '/balances/{address}':
get: get:
summary: Get the balances with a specific peer summary: Get the balances with a specific peer including prepaid services
tags: tags:
- Balance - Balance
parameters: parameters:
...@@ -71,7 +71,50 @@ paths: ...@@ -71,7 +71,50 @@ paths:
description: Swarm address of peer description: Swarm address of peer
responses: responses:
'200': '200':
description: Peer is known description: Balance with the specific peer
content:
application/json:
schema:
$ref: 'SwarmCommon.yaml#/components/schemas/Balance'
'404':
$ref: 'SwarmCommon.yaml#/components/responses/404'
'500':
$ref: 'SwarmCommon.yaml#/components/responses/500'
default:
description: Default response
'/consumed':
get:
summary: Get the past due consumption balances with all known peers
tags:
- Balance
responses:
'200':
description: Own past due consumption balances with all known peers
content:
application/json:
schema:
$ref: 'SwarmCommon.yaml#/components/schemas/Balances'
'500':
$ref: 'SwarmCommon.yaml#/components/responses/500'
default:
description: Default response
'/consumed/{address}':
get:
summary: Get the past due consumption balance with a specific peer
tags:
- Balance
parameters:
- in: path
name: address
schema:
$ref: 'SwarmCommon.yaml#/components/schemas/SwarmAddress'
required: true
description: Swarm address of peer
responses:
'200':
description: Past-due consumption balance with the specific peer
content: content:
application/json: application/json:
schema: schema:
......
...@@ -22,8 +22,9 @@ import ( ...@@ -22,8 +22,9 @@ import (
) )
var ( var (
_ Interface = (*Accounting)(nil) _ Interface = (*Accounting)(nil)
balancesPrefix string = "balance_" balancesPrefix string = "balance_"
balancesSurplusPrefix string = "surplusbalance_"
) )
// Interface is the Accounting interface. // Interface is the Accounting interface.
...@@ -42,8 +43,14 @@ type Interface interface { ...@@ -42,8 +43,14 @@ type Interface interface {
Debit(peer swarm.Address, price uint64) error Debit(peer swarm.Address, price uint64) error
// Balance returns the current balance for the given peer. // Balance returns the current balance for the given peer.
Balance(peer swarm.Address) (int64, error) Balance(peer swarm.Address) (int64, error)
// SurplusBalance returns the current surplus balance for the given peer.
SurplusBalance(peer swarm.Address) (int64, error)
// Balances returns balances for all known peers. // Balances returns balances for all known peers.
Balances() (map[string]int64, error) Balances() (map[string]int64, error)
// CompensatedBalance returns the current balance deducted by current surplus balance for the given peer.
CompensatedBalance(peer swarm.Address) (int64, error)
// CompensatedBalances returns the compensated balances for all known peers.
CompensatedBalances() (map[string]int64, error)
} }
// accountingPeer holds all in-memory accounting information for one peer. // accountingPeer holds all in-memory accounting information for one peer.
...@@ -83,6 +90,8 @@ var ( ...@@ -83,6 +90,8 @@ var (
ErrPeerNoBalance = errors.New("no balance for peer") ErrPeerNoBalance = errors.New("no balance for peer")
// ErrOverflow denotes an arithmetic operation overflowed. // ErrOverflow denotes an arithmetic operation overflowed.
ErrOverflow = errors.New("overflow error") ErrOverflow = errors.New("overflow error")
// ErrInvalidValue denotes an invalid value read from store
ErrInvalidValue = errors.New("invalid value")
) )
// NewAccounting creates a new Accounting instance with the provided options. // NewAccounting creates a new Accounting instance with the provided options.
...@@ -160,10 +169,25 @@ func (a *Accounting) Reserve(ctx context.Context, peer swarm.Address, price uint ...@@ -160,10 +169,25 @@ func (a *Accounting) Reserve(ctx context.Context, peer swarm.Address, price uint
threshold = 0 threshold = 0
} }
additionalDebt, err := a.SurplusBalance(peer)
if err != nil {
return fmt.Errorf("failed to load surplus balance: %w", err)
}
// uint64 conversion of surplusbalance is safe because surplusbalance is always positive
if additionalDebt < 0 {
return ErrInvalidValue
}
increasedExpectedDebt, err := addI64pU64(expectedDebt, uint64(additionalDebt))
if err != nil {
return err
}
// If our expected debt is less than earlyPayment away from our payment threshold // If our expected debt is less than earlyPayment away from our payment threshold
// and we are actually in debt, trigger settlement. // and we are actually in debt, trigger settlement.
// we pay early to avoid needlessly blocking request later when concurrent requests occur and we are already close to the payment threshold. // we pay early to avoid needlessly blocking request later when concurrent requests occur and we are already close to the payment threshold.
if expectedDebt >= int64(threshold) && currentBalance < 0 { if increasedExpectedDebt >= int64(threshold) && currentBalance < 0 {
err = a.settle(ctx, peer, accountingPeer) err = a.settle(ctx, peer, accountingPeer)
if err != nil { if err != nil {
return fmt.Errorf("failed to settle with peer %v: %v", peer, err) return fmt.Errorf("failed to settle with peer %v: %v", peer, err)
...@@ -171,11 +195,15 @@ func (a *Accounting) Reserve(ctx context.Context, peer swarm.Address, price uint ...@@ -171,11 +195,15 @@ func (a *Accounting) Reserve(ctx context.Context, peer swarm.Address, price uint
// if we settled successfully our balance is back at 0 // if we settled successfully our balance is back at 0
// and the expected debt therefore equals next reserved amount // and the expected debt therefore equals next reserved amount
expectedDebt = int64(nextReserved) expectedDebt = int64(nextReserved)
increasedExpectedDebt, err = addI64pU64(expectedDebt, uint64(additionalDebt))
if err != nil {
return err
}
} }
// if expectedDebt would still exceed the paymentThreshold at this point block this request // if expectedDebt would still exceed the paymentThreshold at this point block this request
// this can happen if there is a large number of concurrent requests to the same peer // this can happen if there is a large number of concurrent requests to the same peer
if expectedDebt > int64(a.paymentThreshold) { if increasedExpectedDebt > int64(a.paymentThreshold) {
a.metrics.AccountingBlocksCount.Inc() a.metrics.AccountingBlocksCount.Inc()
return ErrOverdraft return ErrOverdraft
} }
...@@ -299,6 +327,57 @@ func (a *Accounting) Debit(peer swarm.Address, price uint64) error { ...@@ -299,6 +327,57 @@ func (a *Accounting) Debit(peer swarm.Address, price uint64) error {
accountingPeer.lock.Lock() accountingPeer.lock.Lock()
defer accountingPeer.lock.Unlock() defer accountingPeer.lock.Unlock()
cost := price
// see if peer has surplus balance to deduct this transaction of
surplusBalance, err := a.SurplusBalance(peer)
if err != nil {
return fmt.Errorf("failed to get surplus balance: %w", err)
}
if surplusBalance > 0 {
// get new surplus balance after deduct
newSurplusBalance, err := subtractI64mU64(surplusBalance, price)
if err != nil {
return err
}
// if nothing left for debiting, store new surplus balance and return from debit
if newSurplusBalance >= 0 {
a.logger.Tracef("surplus debiting peer %v with value %d, new surplus balance is %d", peer, price, newSurplusBalance)
err = a.store.Put(peerSurplusBalanceKey(peer), newSurplusBalance)
if err != nil {
return fmt.Errorf("failed to persist surplus balance: %w", err)
}
// count debit operations, terminate early
a.metrics.TotalDebitedAmount.Add(float64(price))
a.metrics.DebitEventsCount.Inc()
return nil
}
// if surplus balance didn't cover full transaction, let's continue with leftover part as cost
debitIncrease, err := subtractU64mI64(price, surplusBalance)
if err != nil {
return err
}
// conversion to uint64 is safe because we know the relationship between the values by now, but let's make a sanity check
if debitIncrease <= 0 {
return fmt.Errorf("sanity check failed for partial debit after surplus balance drawn")
}
cost = uint64(debitIncrease)
// if we still have something to debit, than have run out of surplus balance,
// let's store 0 as surplus balance
a.logger.Tracef("surplus debiting peer %v with value %d, new surplus balance is 0", peer, debitIncrease)
err = a.store.Put(peerSurplusBalanceKey(peer), 0)
if err != nil {
return fmt.Errorf("failed to persist surplus balance: %w", err)
}
}
currentBalance, err := a.Balance(peer) currentBalance, err := a.Balance(peer)
if err != nil { if err != nil {
if !errors.Is(err, ErrPeerNoBalance) { if !errors.Is(err, ErrPeerNoBalance) {
...@@ -307,7 +386,7 @@ func (a *Accounting) Debit(peer swarm.Address, price uint64) error { ...@@ -307,7 +386,7 @@ func (a *Accounting) Debit(peer swarm.Address, price uint64) error {
} }
// Get nextBalance by safely increasing current balance with price // Get nextBalance by safely increasing current balance with price
nextBalance, err := addI64pU64(currentBalance, price) nextBalance, err := addI64pU64(currentBalance, cost)
if err != nil { if err != nil {
return err return err
} }
...@@ -345,11 +424,52 @@ func (a *Accounting) Balance(peer swarm.Address) (balance int64, err error) { ...@@ -345,11 +424,52 @@ func (a *Accounting) Balance(peer swarm.Address) (balance int64, err error) {
return balance, nil return balance, nil
} }
// SurplusBalance returns the current balance for the given peer.
func (a *Accounting) SurplusBalance(peer swarm.Address) (balance int64, err error) {
err = a.store.Get(peerSurplusBalanceKey(peer), &balance)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return 0, nil
}
return 0, err
}
return balance, nil
}
// CompensatedBalance returns balance decreased by surplus balance
func (a *Accounting) CompensatedBalance(peer swarm.Address) (compensated int64, err error) {
balance, err := a.Balance(peer)
if err != nil {
return 0, err
}
surplus, err := a.SurplusBalance(peer)
if err != nil {
return 0, err
}
if surplus < 0 {
return 0, ErrInvalidValue
}
// Compensated balance is balance decreased by surplus balance
compensated, err = subtractI64mU64(balance, uint64(surplus))
if err != nil {
return 0, err
}
return compensated, nil
}
// peerBalanceKey returns the balance storage key for the given peer. // peerBalanceKey returns the balance storage key for the given peer.
func peerBalanceKey(peer swarm.Address) string { func peerBalanceKey(peer swarm.Address) string {
return fmt.Sprintf("%s%s", balancesPrefix, peer.String()) return fmt.Sprintf("%s%s", balancesPrefix, peer.String())
} }
// peerSurplusBalanceKey returns the surplus balance storage key for the given peer
func peerSurplusBalanceKey(peer swarm.Address) string {
return fmt.Sprintf("%s%s", balancesSurplusPrefix, peer.String())
}
// getAccountingPeer returns the accountingPeer for a given swarm address. // getAccountingPeer returns the accountingPeer for a given swarm address.
// If not found in memory it will initialize it. // If not found in memory it will initialize it.
func (a *Accounting) getAccountingPeer(peer swarm.Address) (*accountingPeer, error) { func (a *Accounting) getAccountingPeer(peer swarm.Address) (*accountingPeer, error) {
...@@ -399,6 +519,55 @@ func (a *Accounting) Balances() (map[string]int64, error) { ...@@ -399,6 +519,55 @@ func (a *Accounting) Balances() (map[string]int64, error) {
return s, nil return s, nil
} }
// Balances gets balances for all peers from store.
func (a *Accounting) CompensatedBalances() (map[string]int64, error) {
s := make(map[string]int64)
err := a.store.Iterate(balancesPrefix, func(key, val []byte) (stop bool, err error) {
addr, err := balanceKeyPeer(key)
if err != nil {
return false, fmt.Errorf("parse address from key: %s: %v", string(key), err)
}
if _, ok := s[addr.String()]; !ok {
value, err := a.CompensatedBalance(addr)
if err != nil {
return false, fmt.Errorf("get peer %s balance: %v", addr.String(), err)
}
s[addr.String()] = value
}
return false, nil
})
if err != nil {
return nil, err
}
err = a.store.Iterate(balancesSurplusPrefix, func(key, val []byte) (stop bool, err error) {
addr, err := surplusBalanceKeyPeer(key)
if err != nil {
return false, fmt.Errorf("parse address from key: %s: %v", string(key), err)
}
if _, ok := s[addr.String()]; !ok {
value, err := a.CompensatedBalance(addr)
if err != nil {
return false, fmt.Errorf("get peer %s balance: %v", addr.String(), err)
}
s[addr.String()] = value
}
return false, nil
})
if err != nil {
return nil, err
}
return s, nil
}
// balanceKeyPeer returns the embedded peer from the balance storage key. // balanceKeyPeer returns the embedded peer from the balance storage key.
func balanceKeyPeer(key []byte) (swarm.Address, error) { func balanceKeyPeer(key []byte) (swarm.Address, error) {
k := string(key) k := string(key)
...@@ -416,6 +585,22 @@ func balanceKeyPeer(key []byte) (swarm.Address, error) { ...@@ -416,6 +585,22 @@ func balanceKeyPeer(key []byte) (swarm.Address, error) {
return addr, nil return addr, nil
} }
func surplusBalanceKeyPeer(key []byte) (swarm.Address, error) {
k := string(key)
split := strings.SplitAfter(k, balancesSurplusPrefix)
if len(split) != 2 {
return swarm.ZeroAddress, errors.New("no peer in key")
}
addr, err := swarm.ParseHexAddress(split[1])
if err != nil {
return swarm.ZeroAddress, err
}
return addr, nil
}
// NotifyPayment implements the PaymentObserver interface. It is called by // NotifyPayment implements the PaymentObserver interface. It is called by
// Settlement when we receive a payment. // Settlement when we receive a payment.
func (a *Accounting) NotifyPayment(peer swarm.Address, amount uint64) error { func (a *Accounting) NotifyPayment(peer swarm.Address, amount uint64) error {
...@@ -432,18 +617,41 @@ func (a *Accounting) NotifyPayment(peer swarm.Address, amount uint64) error { ...@@ -432,18 +617,41 @@ func (a *Accounting) NotifyPayment(peer swarm.Address, amount uint64) error {
if !errors.Is(err, ErrPeerNoBalance) { if !errors.Is(err, ErrPeerNoBalance) {
return err return err
} }
} }
// if balance is already negative or zero, we credit full amount received to surplus balance and terminate early
if currentBalance <= 0 {
surplus, err := a.SurplusBalance(peer)
if err != nil {
return fmt.Errorf("failed to get surplus balance: %w", err)
}
increasedSurplus, err := addI64pU64(surplus, amount)
if err != nil {
return err
}
a.logger.Tracef("surplus crediting peer %v with amount %d due to payment, new surplus balance is %d", peer, amount, increasedSurplus)
nextBalance, err := subtractI64mU64(currentBalance, amount) err = a.store.Put(peerSurplusBalanceKey(peer), increasedSurplus)
if err != nil {
return fmt.Errorf("failed to persist surplus balance: %w", err)
}
return nil
}
// if current balance is positive, let's make a partial credit to
newBalance, err := subtractI64mU64(currentBalance, amount)
if err != nil { if err != nil {
return err return err
} }
// Don't allow a payment to put use more into debt than the tolerance. // Don't allow a payment to put us into debt
// This is to prevent another node tricking us into settling by settling // This is to prevent another node tricking us into settling by settling
// first (e.g. send a bouncing cheque to trigger an honest cheque in swap). // first (e.g. send a bouncing cheque to trigger an honest cheque in swap).
if nextBalance < -int64(a.paymentTolerance) { nextBalance := newBalance
return fmt.Errorf("refusing to accept payment which would put us too much in debt, new balance would have been %d", nextBalance) if newBalance < 0 {
nextBalance = 0
} }
a.logger.Tracef("crediting peer %v with amount %d due to payment, new balance is %d", peer, amount, nextBalance) a.logger.Tracef("crediting peer %v with amount %d due to payment, new balance is %d", peer, amount, nextBalance)
...@@ -453,6 +661,32 @@ func (a *Accounting) NotifyPayment(peer swarm.Address, amount uint64) error { ...@@ -453,6 +661,32 @@ func (a *Accounting) NotifyPayment(peer swarm.Address, amount uint64) error {
return fmt.Errorf("failed to persist balance: %w", err) return fmt.Errorf("failed to persist balance: %w", err)
} }
// If payment would have put us into debt, rather, let's add to surplusBalance,
// so as that an oversettlement attempt creates balance for future forwarding services
// charges to be deducted of
if newBalance < 0 {
surplusGrowth, err := subtractU64mI64(amount, currentBalance)
if err != nil {
return err
}
surplus, err := a.SurplusBalance(peer)
if err != nil {
return fmt.Errorf("failed to get surplus balance: %w", err)
}
increasedSurplus := surplus + surplusGrowth
if increasedSurplus < surplus {
return ErrOverflow
}
a.logger.Tracef("surplus crediting peer %v with amount %d due to payment, new surplus balance is %d", peer, surplusGrowth, increasedSurplus)
err = a.store.Put(peerSurplusBalanceKey(peer), increasedSurplus)
if err != nil {
return fmt.Errorf("failed to persist surplus balance: %w", err)
}
}
return nil return nil
} }
...@@ -482,6 +716,22 @@ func subtractI64mU64(base int64, subtracted uint64) (result int64, err error) { ...@@ -482,6 +716,22 @@ func subtractI64mU64(base int64, subtracted uint64) (result int64, err error) {
return result, nil return result, nil
} }
func subtractU64mI64(base uint64, subtracted int64) (result int64, err error) {
if base > math.MaxInt64 {
return 0, ErrOverflow
}
// base is positive, overflow can happen by subtracting negative number
result = int64(base) - subtracted
if subtracted < 0 {
if result < int64(base) {
return 0, ErrOverflow
}
}
return result, nil
}
// addI64pU64 is a helper function for safe addition of Int64 + Uint64 // addI64pU64 is a helper function for safe addition of Int64 + Uint64
// It checks for // It checks for
// - overflow safety in conversion of uint64 to int64 // - overflow safety in conversion of uint64 to int64
......
...@@ -225,7 +225,7 @@ func TestAccountingOverflowReserve(t *testing.T) { ...@@ -225,7 +225,7 @@ func TestAccountingOverflowReserve(t *testing.T) {
} }
} }
func TestAccountingOverflowNotifyPayment(t *testing.T) { func TestAccountingOverflowSurplusBalance(t *testing.T) {
logger := logging.New(ioutil.Discard, 0) logger := logging.New(ioutil.Discard, 0)
store := mock.NewStateStore() store := mock.NewStateStore()
...@@ -241,11 +241,32 @@ func TestAccountingOverflowNotifyPayment(t *testing.T) { ...@@ -241,11 +241,32 @@ func TestAccountingOverflowNotifyPayment(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Try Crediting a large amount to peer so balance is negative // Try Debiting a large amount to peer so balance is large positive
err = acc.Credit(peer1Addr, math.MaxInt64) err = acc.Debit(peer1Addr, testPaymentThresholdLarge-1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Notify of incoming payment from same peer, so balance goes to 0 with surplusbalance 2
err = acc.NotifyPayment(peer1Addr, math.MaxInt64)
if err != nil {
t.Fatal("Unexpected overflow from NotifyPayment")
}
// sanity check surplus balance
val, err := acc.SurplusBalance(peer1Addr)
if err != nil {
t.Fatal("Error checking Surplusbalance")
}
if val != 2 {
t.Fatal("Not expected surplus balance")
}
// sanity check balance
val, err = acc.Balance(peer1Addr)
if err != nil {
t.Fatal("Error checking Balance")
}
if val != 0 {
t.Fatal("Unexpected balance")
}
// Notify of incoming payment from same peer, further decreasing balance, this should overflow // Notify of incoming payment from same peer, further decreasing balance, this should overflow
err = acc.NotifyPayment(peer1Addr, math.MaxInt64) err = acc.NotifyPayment(peer1Addr, math.MaxInt64)
if err == nil { if err == nil {
...@@ -253,10 +274,44 @@ func TestAccountingOverflowNotifyPayment(t *testing.T) { ...@@ -253,10 +274,44 @@ func TestAccountingOverflowNotifyPayment(t *testing.T) {
} }
// If we had other error, assert fail // If we had other error, assert fail
if !errors.Is(err, accounting.ErrOverflow) { if !errors.Is(err, accounting.ErrOverflow) {
t.Fatalf("expected overflow error from Debit, got %v", err) t.Fatal("Expected overflow error from NotifyPayment")
} }
} }
func TestAccountingOverflowNotifyPayment(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
store := mock.NewStateStore()
defer store.Close()
settlement := mockSettlement.NewSettlement()
acc, err := accounting.NewAccounting(testPaymentThresholdLarge, 0, 0, logger, store, settlement, nil)
if err != nil {
t.Fatal(err)
}
peer1Addr, err := swarm.ParseHexAddress("00112233")
if err != nil {
t.Fatal(err)
}
// Try Crediting a large amount to peer so balance is negative
err = acc.Credit(peer1Addr, math.MaxInt64)
if err != nil {
t.Fatal(err)
}
// NotifyPayment for peer should now fill the surplus balance
err = acc.NotifyPayment(peer1Addr, math.MaxInt64)
if err != nil {
t.Fatalf("Expected no error but got one: %v", err)
}
// Notify of incoming payment from same peer, further increasing the surplus balance into an overflow
err = acc.NotifyPayment(peer1Addr, 1)
if !errors.Is(err, accounting.ErrOverflow) {
t.Fatalf("expected overflow error from Debit, got %v", err)
}
}
func TestAccountingOverflowDebit(t *testing.T) { func TestAccountingOverflowDebit(t *testing.T) {
logger := logging.New(ioutil.Discard, 0) logger := logging.New(ioutil.Discard, 0)
...@@ -528,6 +583,113 @@ func TestAccountingCallSettlementEarly(t *testing.T) { ...@@ -528,6 +583,113 @@ func TestAccountingCallSettlementEarly(t *testing.T) {
} }
} }
func TestAccountingSurplusBalance(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
store := mock.NewStateStore()
defer store.Close()
settlement := mockSettlement.NewSettlement()
acc, err := accounting.NewAccounting(testPaymentThreshold, 0, 0, logger, store, settlement, nil)
if err != nil {
t.Fatal(err)
}
peer1Addr, err := swarm.ParseHexAddress("00112233")
if err != nil {
t.Fatal(err)
}
// Try Debiting a large amount to peer so balance is large positive
err = acc.Debit(peer1Addr, testPaymentThreshold-1)
if err != nil {
t.Fatal(err)
}
// Notify of incoming payment from same peer, so balance goes to 0 with surplusbalance 2
err = acc.NotifyPayment(peer1Addr, testPaymentThreshold+1)
if err != nil {
t.Fatal("Unexpected overflow from doable NotifyPayment")
}
//sanity check surplus balance
val, err := acc.SurplusBalance(peer1Addr)
if err != nil {
t.Fatal("Error checking Surplusbalance")
}
if val != 2 {
t.Fatal("Not expected surplus balance")
}
//sanity check balance
val, err = acc.Balance(peer1Addr)
if err != nil {
t.Fatal("Error checking Balance")
}
if val != 0 {
t.Fatal("Not expected balance")
}
// Notify of incoming payment from same peer, so balance goes to 0 with surplusbalance 10002 (testpaymentthreshold+2)
err = acc.NotifyPayment(peer1Addr, testPaymentThreshold)
if err != nil {
t.Fatal("Unexpected error from NotifyPayment")
}
//sanity check surplus balance
val, err = acc.SurplusBalance(peer1Addr)
if err != nil {
t.Fatal("Error checking Surplusbalance")
}
if val != testPaymentThreshold+2 {
t.Fatal("Unexpected surplus balance")
}
//sanity check balance
val, err = acc.Balance(peer1Addr)
if err != nil {
t.Fatal("Error checking Balance")
}
if val != 0 {
t.Fatal("Not expected balance, expected 0")
}
// Debit for same peer, so balance stays 0 with surplusbalance decreasing to 2
err = acc.Debit(peer1Addr, testPaymentThreshold)
if err != nil {
t.Fatal("Unexpected error from Credit")
}
// samity check surplus balance
val, err = acc.SurplusBalance(peer1Addr)
if err != nil {
t.Fatal("Error checking Surplusbalance")
}
if val != 2 {
t.Fatal("Unexpected surplus balance")
}
//sanity check balance
val, err = acc.Balance(peer1Addr)
if err != nil {
t.Fatal("Error checking Balance")
}
if val != 0 {
t.Fatal("Not expected balance, expected 0")
}
// Debit for same peer, so balance goes to 9998 (testpaymentthreshold - 2) with surplusbalance decreasing to 0
err = acc.Debit(peer1Addr, testPaymentThreshold)
if err != nil {
t.Fatal("Unexpected error from NotifyPayment")
}
// samity check surplus balance
val, err = acc.SurplusBalance(peer1Addr)
if err != nil {
t.Fatal("Error checking Surplusbalance")
}
if val != 0 {
t.Fatal("Unexpected surplus balance")
}
//sanity check balance
val, err = acc.Balance(peer1Addr)
if err != nil {
t.Fatal("Error checking Balance")
}
if val != testPaymentThreshold-2 {
t.Fatal("Not expected balance, expected 0")
}
}
// TestAccountingNotifyPayment tests that payments adjust the balance and payment which put us into debt are rejected // TestAccountingNotifyPayment tests that payments adjust the balance and payment which put us into debt are rejected
func TestAccountingNotifyPayment(t *testing.T) { func TestAccountingNotifyPayment(t *testing.T) {
logger := logging.New(ioutil.Discard, 0) logger := logging.New(ioutil.Discard, 0)
...@@ -562,8 +724,8 @@ func TestAccountingNotifyPayment(t *testing.T) { ...@@ -562,8 +724,8 @@ func TestAccountingNotifyPayment(t *testing.T) {
} }
err = acc.NotifyPayment(peer1Addr, debtAmount+testPaymentTolerance+1) err = acc.NotifyPayment(peer1Addr, debtAmount+testPaymentTolerance+1)
if err == nil { if err != nil {
t.Fatal("expected payment to be rejected") t.Fatal(err)
} }
} }
......
...@@ -14,14 +14,18 @@ import ( ...@@ -14,14 +14,18 @@ import (
// Service is the mock Accounting service. // Service is the mock Accounting service.
type Service struct { type Service struct {
lock sync.Mutex lock sync.Mutex
balances map[string]int64 balances map[string]int64
reserveFunc func(ctx context.Context, peer swarm.Address, price uint64) error reserveFunc func(ctx context.Context, peer swarm.Address, price uint64) error
releaseFunc func(peer swarm.Address, price uint64) releaseFunc func(peer swarm.Address, price uint64)
creditFunc func(peer swarm.Address, price uint64) error creditFunc func(peer swarm.Address, price uint64) error
debitFunc func(peer swarm.Address, price uint64) error debitFunc func(peer swarm.Address, price uint64) error
balanceFunc func(swarm.Address) (int64, error) balanceFunc func(swarm.Address) (int64, error)
balancesFunc func() (map[string]int64, error) balancesFunc func() (map[string]int64, error)
compensatedBalanceFunc func(swarm.Address) (int64, error)
compensatedBalancesFunc func() (map[string]int64, error)
balanceSurplusFunc func(swarm.Address) (int64, error)
} }
// WithReserveFunc sets the mock Reserve function // WithReserveFunc sets the mock Reserve function
...@@ -66,6 +70,27 @@ func WithBalancesFunc(f func() (map[string]int64, error)) Option { ...@@ -66,6 +70,27 @@ func WithBalancesFunc(f func() (map[string]int64, error)) Option {
}) })
} }
// WithCompensatedBalanceFunc sets the mock Balance function
func WithCompensatedBalanceFunc(f func(swarm.Address) (int64, error)) Option {
return optionFunc(func(s *Service) {
s.compensatedBalanceFunc = f
})
}
// WithCompensatedBalancesFunc sets the mock Balances function
func WithCompensatedBalancesFunc(f func() (map[string]int64, error)) Option {
return optionFunc(func(s *Service) {
s.compensatedBalancesFunc = f
})
}
// WithBalanceSurplusFunc sets the mock SurplusBalance function
func WithBalanceSurplusFunc(f func(swarm.Address) (int64, error)) Option {
return optionFunc(func(s *Service) {
s.balanceSurplusFunc = f
})
}
// NewAccounting creates the mock accounting implementation // NewAccounting creates the mock accounting implementation
func NewAccounting(opts ...Option) accounting.Interface { func NewAccounting(opts ...Option) accounting.Interface {
mock := new(Service) mock := new(Service)
...@@ -131,6 +156,34 @@ func (s *Service) Balances() (map[string]int64, error) { ...@@ -131,6 +156,34 @@ func (s *Service) Balances() (map[string]int64, error) {
return s.balances, nil return s.balances, nil
} }
// CompensatedBalance is the mock function wrapper that calls the set implementation
func (s *Service) CompensatedBalance(peer swarm.Address) (int64, error) {
if s.compensatedBalanceFunc != nil {
return s.compensatedBalanceFunc(peer)
}
s.lock.Lock()
defer s.lock.Unlock()
return s.balances[peer.String()], nil
}
// CompensatedBalances is the mock function wrapper that calls the set implementation
func (s *Service) CompensatedBalances() (map[string]int64, error) {
if s.compensatedBalancesFunc != nil {
return s.compensatedBalancesFunc()
}
return s.balances, nil
}
//
func (s *Service) SurplusBalance(peer swarm.Address) (int64, error) {
if s.balanceFunc != nil {
return s.balanceSurplusFunc(peer)
}
s.lock.Lock()
defer s.lock.Unlock()
return 0, nil
}
// Option is the option passed to the mock accounting service // Option is the option passed to the mock accounting service
type Option interface { type Option interface {
apply(*Service) apply(*Service)
......
...@@ -79,3 +79,53 @@ func (s *server) peerBalanceHandler(w http.ResponseWriter, r *http.Request) { ...@@ -79,3 +79,53 @@ func (s *server) peerBalanceHandler(w http.ResponseWriter, r *http.Request) {
Balance: balance, Balance: balance,
}) })
} }
func (s *server) compensatedBalancesHandler(w http.ResponseWriter, r *http.Request) {
balances, err := s.Accounting.CompensatedBalances()
if err != nil {
jsonhttp.InternalServerError(w, errCantBalances)
s.Logger.Debugf("debug api: compensated balances: %v", err)
s.Logger.Error("debug api: can not get compensated balances")
return
}
balResponses := make([]balanceResponse, len(balances))
i := 0
for k := range balances {
balResponses[i] = balanceResponse{
Peer: k,
Balance: balances[k],
}
i++
}
jsonhttp.OK(w, balancesResponse{Balances: balResponses})
}
func (s *server) compensatedPeerBalanceHandler(w http.ResponseWriter, r *http.Request) {
addr := mux.Vars(r)["peer"]
peer, err := swarm.ParseHexAddress(addr)
if err != nil {
s.Logger.Debugf("debug api: compensated balances peer: invalid peer address %s: %v", addr, err)
s.Logger.Errorf("debug api: compensated balances peer: invalid peer address %s", addr)
jsonhttp.NotFound(w, errInvaliAddress)
return
}
balance, err := s.Accounting.CompensatedBalance(peer)
if err != nil {
if errors.Is(err, accounting.ErrPeerNoBalance) {
jsonhttp.NotFound(w, errNoBalance)
return
}
s.Logger.Debugf("debug api: compensated balances peer: get peer %s balance: %v", peer.String(), err)
s.Logger.Errorf("debug api: compensated balances peer: can't get peer %s balance", peer.String())
jsonhttp.InternalServerError(w, errCantBalance)
return
}
jsonhttp.OK(w, balanceResponse{
Peer: peer.String(),
Balance: balance,
})
}
...@@ -19,7 +19,7 @@ import ( ...@@ -19,7 +19,7 @@ import (
) )
func TestBalances(t *testing.T) { func TestBalances(t *testing.T) {
balancesFunc := func() (ret map[string]int64, err error) { compensatedBalancesFunc := func() (ret map[string]int64, err error) {
ret = make(map[string]int64) ret = make(map[string]int64)
ret["DEAD"] = 1000000000000000000 ret["DEAD"] = 1000000000000000000
ret["BEEF"] = -100000000000000000 ret["BEEF"] = -100000000000000000
...@@ -27,7 +27,7 @@ func TestBalances(t *testing.T) { ...@@ -27,7 +27,7 @@ func TestBalances(t *testing.T) {
return ret, err return ret, err
} }
testServer := newTestServer(t, testServerOptions{ testServer := newTestServer(t, testServerOptions{
AccountingOpts: []mock.Option{mock.WithBalancesFunc(balancesFunc)}, AccountingOpts: []mock.Option{mock.WithCompensatedBalancesFunc(compensatedBalancesFunc)},
}) })
expected := &debugapi.BalancesResponse{ expected := &debugapi.BalancesResponse{
...@@ -61,11 +61,11 @@ func TestBalances(t *testing.T) { ...@@ -61,11 +61,11 @@ func TestBalances(t *testing.T) {
func TestBalancesError(t *testing.T) { func TestBalancesError(t *testing.T) {
wantErr := errors.New("ASDF") wantErr := errors.New("ASDF")
balancesFunc := func() (ret map[string]int64, err error) { compensatedBalancesFunc := func() (ret map[string]int64, err error) {
return nil, wantErr return nil, wantErr
} }
testServer := newTestServer(t, testServerOptions{ testServer := newTestServer(t, testServerOptions{
AccountingOpts: []mock.Option{mock.WithBalancesFunc(balancesFunc)}, AccountingOpts: []mock.Option{mock.WithCompensatedBalancesFunc(compensatedBalancesFunc)},
}) })
jsonhttptest.Request(t, testServer.Client, http.MethodGet, "/balances", http.StatusInternalServerError, jsonhttptest.Request(t, testServer.Client, http.MethodGet, "/balances", http.StatusInternalServerError,
...@@ -78,11 +78,11 @@ func TestBalancesError(t *testing.T) { ...@@ -78,11 +78,11 @@ func TestBalancesError(t *testing.T) {
func TestBalancesPeers(t *testing.T) { func TestBalancesPeers(t *testing.T) {
peer := "bff2c89e85e78c38bd89fca1acc996afb876c21bf5a8482ad798ce15f1c223fa" peer := "bff2c89e85e78c38bd89fca1acc996afb876c21bf5a8482ad798ce15f1c223fa"
balanceFunc := func(swarm.Address) (int64, error) { compensatedBalanceFunc := func(swarm.Address) (int64, error) {
return 1000000000000000000, nil return 1000000000000000000, nil
} }
testServer := newTestServer(t, testServerOptions{ testServer := newTestServer(t, testServerOptions{
AccountingOpts: []mock.Option{mock.WithBalanceFunc(balanceFunc)}, AccountingOpts: []mock.Option{mock.WithCompensatedBalanceFunc(compensatedBalanceFunc)},
}) })
jsonhttptest.Request(t, testServer.Client, http.MethodGet, "/balances/"+peer, http.StatusOK, jsonhttptest.Request(t, testServer.Client, http.MethodGet, "/balances/"+peer, http.StatusOK,
...@@ -96,11 +96,11 @@ func TestBalancesPeers(t *testing.T) { ...@@ -96,11 +96,11 @@ func TestBalancesPeers(t *testing.T) {
func TestBalancesPeersError(t *testing.T) { func TestBalancesPeersError(t *testing.T) {
peer := "bff2c89e85e78c38bd89fca1acc996afb876c21bf5a8482ad798ce15f1c223fa" peer := "bff2c89e85e78c38bd89fca1acc996afb876c21bf5a8482ad798ce15f1c223fa"
wantErr := errors.New("Error") wantErr := errors.New("Error")
balanceFunc := func(swarm.Address) (int64, error) { compensatedBalanceFunc := func(swarm.Address) (int64, error) {
return 0, wantErr return 0, wantErr
} }
testServer := newTestServer(t, testServerOptions{ testServer := newTestServer(t, testServerOptions{
AccountingOpts: []mock.Option{mock.WithBalanceFunc(balanceFunc)}, AccountingOpts: []mock.Option{mock.WithCompensatedBalanceFunc(compensatedBalanceFunc)},
}) })
jsonhttptest.Request(t, testServer.Client, http.MethodGet, "/balances/"+peer, http.StatusInternalServerError, jsonhttptest.Request(t, testServer.Client, http.MethodGet, "/balances/"+peer, http.StatusInternalServerError,
...@@ -113,11 +113,11 @@ func TestBalancesPeersError(t *testing.T) { ...@@ -113,11 +113,11 @@ func TestBalancesPeersError(t *testing.T) {
func TestBalancesPeersNoBalance(t *testing.T) { func TestBalancesPeersNoBalance(t *testing.T) {
peer := "bff2c89e85e78c38bd89fca1acc996afb876c21bf5a8482ad798ce15f1c223fa" peer := "bff2c89e85e78c38bd89fca1acc996afb876c21bf5a8482ad798ce15f1c223fa"
balanceFunc := func(swarm.Address) (int64, error) { compensatedBalanceFunc := func(swarm.Address) (int64, error) {
return 0, accounting.ErrPeerNoBalance return 0, accounting.ErrPeerNoBalance
} }
testServer := newTestServer(t, testServerOptions{ testServer := newTestServer(t, testServerOptions{
AccountingOpts: []mock.Option{mock.WithBalanceFunc(balanceFunc)}, AccountingOpts: []mock.Option{mock.WithCompensatedBalanceFunc(compensatedBalanceFunc)},
}) })
jsonhttptest.Request(t, testServer.Client, http.MethodGet, "/balances/"+peer, http.StatusNotFound, jsonhttptest.Request(t, testServer.Client, http.MethodGet, "/balances/"+peer, http.StatusNotFound,
...@@ -170,3 +170,126 @@ func equalBalances(a, b *debugapi.BalancesResponse) bool { ...@@ -170,3 +170,126 @@ func equalBalances(a, b *debugapi.BalancesResponse) bool {
return true return true
} }
func TestConsumedBalances(t *testing.T) {
balancesFunc := func() (ret map[string]int64, err error) {
ret = make(map[string]int64)
ret["DEAD"] = 1000000000000000000
ret["BEEF"] = -100000000000000000
ret["PARTY"] = 0
return ret, err
}
testServer := newTestServer(t, testServerOptions{
AccountingOpts: []mock.Option{mock.WithBalancesFunc(balancesFunc)},
})
expected := &debugapi.BalancesResponse{
[]debugapi.BalanceResponse{
{
Peer: "DEAD",
Balance: 1000000000000000000,
},
{
Peer: "BEEF",
Balance: -100000000000000000,
},
{
Peer: "PARTY",
Balance: 0,
},
},
}
// We expect a list of items unordered by peer:
var got *debugapi.BalancesResponse
jsonhttptest.Request(t, testServer.Client, http.MethodGet, "/consumed", http.StatusOK,
jsonhttptest.WithUnmarshalJSONResponse(&got),
)
if !equalBalances(got, expected) {
t.Errorf("got balances: %v, expected: %v", got, expected)
}
}
func TestConsumedError(t *testing.T) {
wantErr := errors.New("ASDF")
balancesFunc := func() (ret map[string]int64, err error) {
return nil, wantErr
}
testServer := newTestServer(t, testServerOptions{
AccountingOpts: []mock.Option{mock.WithBalancesFunc(balancesFunc)},
})
jsonhttptest.Request(t, testServer.Client, http.MethodGet, "/consumed", http.StatusInternalServerError,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: debugapi.ErrCantBalances,
Code: http.StatusInternalServerError,
}),
)
}
func TestConsumedPeers(t *testing.T) {
peer := "bff2c89e85e78c38bd89fca1acc996afb876c21bf5a8482ad798ce15f1c223fa"
balanceFunc := func(swarm.Address) (int64, error) {
return 1000000000000000000, nil
}
testServer := newTestServer(t, testServerOptions{
AccountingOpts: []mock.Option{mock.WithBalanceFunc(balanceFunc)},
})
jsonhttptest.Request(t, testServer.Client, http.MethodGet, "/consumed/"+peer, http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(debugapi.BalanceResponse{
Peer: peer,
Balance: 1000000000000000000,
}),
)
}
func TestConsumedPeersError(t *testing.T) {
peer := "bff2c89e85e78c38bd89fca1acc996afb876c21bf5a8482ad798ce15f1c223fa"
wantErr := errors.New("Error")
balanceFunc := func(swarm.Address) (int64, error) {
return 0, wantErr
}
testServer := newTestServer(t, testServerOptions{
AccountingOpts: []mock.Option{mock.WithBalanceFunc(balanceFunc)},
})
jsonhttptest.Request(t, testServer.Client, http.MethodGet, "/consumed/"+peer, http.StatusInternalServerError,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: debugapi.ErrCantBalance,
Code: http.StatusInternalServerError,
}),
)
}
func TestConsumedPeersNoBalance(t *testing.T) {
peer := "bff2c89e85e78c38bd89fca1acc996afb876c21bf5a8482ad798ce15f1c223fa"
balanceFunc := func(swarm.Address) (int64, error) {
return 0, accounting.ErrPeerNoBalance
}
testServer := newTestServer(t, testServerOptions{
AccountingOpts: []mock.Option{mock.WithBalanceFunc(balanceFunc)},
})
jsonhttptest.Request(t, testServer.Client, http.MethodGet, "/consumed/"+peer, http.StatusNotFound,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: debugapi.ErrNoBalance,
Code: http.StatusNotFound,
}),
)
}
func TestConsumedInvalidAddress(t *testing.T) {
peer := "bad peer address"
testServer := newTestServer(t, testServerOptions{})
jsonhttptest.Request(t, testServer.Client, http.MethodGet, "/consumed/"+peer, http.StatusNotFound,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: debugapi.ErrInvaliAddress,
Code: http.StatusNotFound,
}),
)
}
...@@ -84,16 +84,27 @@ func (s *server) setupRouting() { ...@@ -84,16 +84,27 @@ func (s *server) setupRouting() {
web.FinalHandlerFunc(s.setWelcomeMessageHandler), web.FinalHandlerFunc(s.setWelcomeMessageHandler),
), ),
}) })
router.Handle("/balances", jsonhttp.MethodHandler{ router.Handle("/balances", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.balancesHandler), "GET": http.HandlerFunc(s.compensatedBalancesHandler),
}) })
router.Handle("/balances/{peer}", jsonhttp.MethodHandler{ router.Handle("/balances/{peer}", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.compensatedPeerBalanceHandler),
})
router.Handle("/consumed", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.balancesHandler),
})
router.Handle("/consumed/{peer}", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.peerBalanceHandler), "GET": http.HandlerFunc(s.peerBalanceHandler),
}) })
router.Handle("/settlements", jsonhttp.MethodHandler{ router.Handle("/settlements", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.settlementsHandler), "GET": http.HandlerFunc(s.settlementsHandler),
}) })
router.Handle("/settlements/{peer}", jsonhttp.MethodHandler{ router.Handle("/settlements/{peer}", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.peerSettlementsHandler), "GET": http.HandlerFunc(s.peerSettlementsHandler),
}) })
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment