Commit 80d9ac0c authored by metacertain's avatar metacertain Committed by GitHub

Safety checks in accounting (#642)

Safety checks in accounting
Correct restoring balance after unsuccessful payment
Added code comments for arithmetic safety related parts
Co-authored-by: default avatarPavle Batuta <pavle@batuta.xyz>
parent fd0fbb93
......@@ -8,6 +8,7 @@ import (
"context"
"errors"
"fmt"
"math"
"strings"
"sync"
"time"
......@@ -24,31 +25,35 @@ var (
balancesPrefix string = "balance_"
)
// Interface is the main interface for Accounting
// Interface is the Accounting interface.
type Interface interface {
// Reserve reserves a portion of the balance for peer
// It returns an error if the operation would risk exceeding the disconnect threshold
// This should be called (always in combination with Release) before a Credit action to prevent overspending in case of concurrent requests
// Reserve reserves a portion of the balance for peer. Returns an error if
// the operation risks exceeding the disconnect threshold.
//
// This should be called (always in combination with Release) before a
// Credit action to prevent overspending in case of concurrent requests.
Reserve(peer swarm.Address, price uint64) error
// Release releases reserved funds
// Release releases the reserved funds.
Release(peer swarm.Address, price uint64)
// Credit increases the balance the peer has with us (we "pay" the peer)
// Credit increases the balance the peer has with us (we "pay" the peer).
Credit(peer swarm.Address, price uint64) error
// Debit increases the balance we have with the peer (we get "paid")
// Debit increases the balance we have with the peer (we get "paid" back).
Debit(peer swarm.Address, price uint64) error
// Balance returns the current balance for the given peer
// Balance returns the current balance for the given peer.
Balance(peer swarm.Address) (int64, error)
// Balances returns balances for all known peers
// Balances returns balances for all known peers.
Balances() (map[string]int64, error)
}
// accountingPeer holds all in-memory accounting information for one peer
// accountingPeer holds all in-memory accounting information for one peer.
type accountingPeer struct {
lock sync.Mutex // lock to be held during any accounting action for this peer
reservedBalance uint64 // amount currently reserved for active peer interaction
// Lock to be held during any accounting action for this peer.
lock sync.Mutex
// Amount currently reserved for active peer interaction
reservedBalance uint64
}
// Options for accounting
// Options are options provided to Accounting.
type Options struct {
PaymentThreshold uint64
PaymentTolerance uint64
......@@ -57,31 +62,42 @@ type Options struct {
Settlement settlement.Interface
}
// Accounting is the main implementation of the accounting interface
// Accounting is the main implementation of the accounting interface.
type Accounting struct {
accountingPeersMu sync.Mutex // mutex for accessing the accountingPeers map
// Mutex for accessing the accountingPeers map.
accountingPeersMu sync.Mutex
accountingPeers map[string]*accountingPeer
logger logging.Logger
store storage.StateStorer
paymentThreshold uint64 // the payment threshold in BZZ we communicate to our peers
paymentTolerance uint64 // the amount in BZZ we let peers exceed the payment threshold before disconnected
// The payment threshold in BZZ we communicate to our peers.
paymentThreshold uint64
// The amount in BZZ we let peers exceed the payment threshold before we
// disconnect them.
paymentTolerance uint64
settlement settlement.Interface
metrics metrics
}
var (
// ErrOverdraft is the error returned if the expected debt in Reserve would exceed the payment thresholds
// ErrOverdraft denotes the expected debt in Reserve would exceed the payment thresholds.
ErrOverdraft = errors.New("attempted overdraft")
// ErrDisconnectThresholdExceeded is the error returned if a peer has exceeded the disconnect threshold
// ErrDisconnectThresholdExceeded denotes a peer has exceeded the disconnect threshold.
ErrDisconnectThresholdExceeded = errors.New("disconnect threshold exceeded")
// ErrInvalidPaymentTolerance is the error returned if the payment tolerance is too high compared to the payment threshold
// ErrInvalidPaymentTolerance denotes the payment tolerance is too high
// compared to the payment threshold.
ErrInvalidPaymentTolerance = errors.New("payment tolerance must be less than half the payment threshold")
// ErrPeerNoBalance is the error returned if no balance in store exists for a peer
ErrPeerNoBalance = errors.New("no balance for peer")
// ErrOverflow denotes an arithmetic operation overflowed.
ErrOverflow = errors.New("overflow error")
)
// NewAccounting creates a new Accounting instance with the provided options
// NewAccounting creates a new Accounting instance with the provided options.
func NewAccounting(o Options) (*Accounting, error) {
if o.PaymentTolerance+o.PaymentThreshold > math.MaxInt64 {
return nil, fmt.Errorf("tolerance plus threshold too big: %w", ErrOverflow)
}
if o.PaymentTolerance > o.PaymentThreshold/2 {
return nil, ErrInvalidPaymentTolerance
}
......@@ -97,7 +113,7 @@ func NewAccounting(o Options) (*Accounting, error) {
}, nil
}
// Reserve reserves a portion of the balance for peer
// Reserve reserves a portion of the balance for peer.
func (a *Accounting) Reserve(peer swarm.Address, price uint64) error {
accountingPeer, err := a.getAccountingPeer(peer)
if err != nil {
......@@ -114,22 +130,36 @@ func (a *Accounting) Reserve(peer swarm.Address, price uint64) error {
}
}
expectedDebt := -(currentBalance - int64(accountingPeer.reservedBalance))
// Subtract already reserved amount from actual balance, to get expected balance
expectedBalance, err := subtractI64mU64(currentBalance, accountingPeer.reservedBalance)
if err != nil {
return err
}
// Determine if we owe anything to the peer, if we owe less than 0, we conclude we owe nothing
// This conversion is made safe by previous subtractI64mU64 not allowing MinInt64
expectedDebt := -expectedBalance
if expectedDebt < 0 {
expectedDebt = 0
}
// check if the expected debt is already over the payment threshold
// Check if the expected debt is already over the payment threshold.
if uint64(expectedDebt) > a.paymentThreshold {
a.metrics.AccountingBlocksCount.Inc()
return ErrOverdraft
}
// Check for safety of increase of reservedBalance by price
if accountingPeer.reservedBalance+price < accountingPeer.reservedBalance {
return ErrOverflow
}
accountingPeer.reservedBalance += price
return nil
}
// Release releases reserved funds
// Release releases reserved funds.
func (a *Accounting) Release(peer swarm.Address, price uint64) {
accountingPeer, err := a.getAccountingPeer(peer)
if err != nil {
......@@ -140,8 +170,8 @@ func (a *Accounting) Release(peer swarm.Address, price uint64) {
accountingPeer.lock.Lock()
defer accountingPeer.lock.Unlock()
// NOTE: this should never happen if Reserve and Release calls are paired.
if price > accountingPeer.reservedBalance {
// If Reserve and Release calls are always paired this should never happen
a.logger.Error("attempting to release more balance than was reserved for peer")
accountingPeer.reservedBalance = 0
} else {
......@@ -149,7 +179,8 @@ func (a *Accounting) Release(peer swarm.Address, price uint64) {
}
}
// Credit increases the amount of credit we have with the given peer (and decreases existing debt).
// Credit increases the amount of credit we have with the given peer
// (and decreases existing debt).
func (a *Accounting) Credit(peer swarm.Address, price uint64) error {
accountingPeer, err := a.getAccountingPeer(peer)
if err != nil {
......@@ -166,12 +197,24 @@ func (a *Accounting) Credit(peer swarm.Address, price uint64) error {
}
}
nextBalance := currentBalance - int64(price)
// Calculate next balance by safely decreasing current balance with the price we credit
nextBalance, err := subtractI64mU64(currentBalance, price)
if err != nil {
return err
}
a.logger.Tracef("crediting peer %v with price %d, new balance is %d", peer, price, nextBalance)
// compute expected debt before update because reserve still includes the amount that is deducted from the balance
expectedDebt := -(currentBalance - int64(accountingPeer.reservedBalance))
// Get expectedbalance by safely decreasing current balance with reserved amounts
expectedBalance, err := subtractI64mU64(currentBalance, accountingPeer.reservedBalance)
if err != nil {
return err
}
// Compute expected debt before update because reserve still includes the
// amount that is deducted from the balance.
// This conversion is made safe by previous subtractI64mU64 not allowing MinInt64
expectedDebt := -expectedBalance
if expectedDebt < 0 {
expectedDebt = 0
}
......@@ -184,7 +227,8 @@ func (a *Accounting) Credit(peer swarm.Address, price uint64) error {
a.metrics.TotalCreditedAmount.Add(float64(price))
a.metrics.CreditEventsCount.Inc()
// if our expected debt exceeds our payment threshold (which we assume is also the peers payment threshold), trigger settlement
// If our expected debt exceeds our payment threshold (which we assume is
// also the peers payment threshold), trigger settlement.
if uint64(expectedDebt) >= a.paymentThreshold {
err = a.settle(peer, accountingPeer)
if err != nil {
......@@ -195,8 +239,8 @@ func (a *Accounting) Credit(peer swarm.Address, price uint64) error {
return nil
}
// settle all debt with a peer
// the lock on the accountingPeer must be held when called
// Settle all debt with a peer. The lock on the accountingPeer must be held when
// called.
func (a *Accounting) settle(peer swarm.Address, balance *accountingPeer) error {
oldBalance, err := a.Balance(peer)
if err != nil {
......@@ -205,17 +249,25 @@ func (a *Accounting) settle(peer swarm.Address, balance *accountingPeer) error {
}
}
// don't do anything if there is no actual debt
// this might be the case if the peer owes us and the total reserve for a peer exceeds the payment treshhold
// Don't do anything if there is no actual debt.
// This might be the case if the peer owes us and the total reserve for a
// peer exceeds the payment treshold.
if oldBalance >= 0 {
return nil
}
// check safety of the following -1 * int64 conversion, all negative int64 have positive int64 equals except MinInt64
if oldBalance == math.MinInt64 {
return ErrOverflow
}
// This is safe because of the earlier check for oldbalance < 0 and the check for != MinInt64
paymentAmount := uint64(-oldBalance)
nextBalance := oldBalance + int64(paymentAmount)
nextBalance := 0
// try to save the next balance first
// otherwise we might pay and then not be able to save, thus paying again after restart
// Try to save the next balance first.
// Otherwise we might pay and then not be able to save, forcing us to pay
// again after restart.
err = a.store.Put(peerBalanceKey(peer), nextBalance)
if err != nil {
return fmt.Errorf("failed to persist balance: %w", err)
......@@ -224,16 +276,19 @@ func (a *Accounting) settle(peer swarm.Address, balance *accountingPeer) error {
err = a.settlement.Pay(context.Background(), peer, paymentAmount)
if err != nil {
err = fmt.Errorf("settlement for amount %d failed: %w", paymentAmount, err)
// if the payment didn't work we should restore the old balance in the state store
if storeErr := a.store.Put(peerBalanceKey(peer), nextBalance); storeErr != nil {
// If the payment didn't succeed we should restore the old balance in
// the state store.
if storeErr := a.store.Put(peerBalanceKey(peer), oldBalance); storeErr != nil {
a.logger.Errorf("failed to restore balance after failed settlement for peer %v: %v", peer, storeErr)
}
return err
}
return nil
}
// Debit increases the amount of debt we have with the given peer (and decreases existing credit)
// Debit increases the amount of debt we have with the given peer (and decreases
// existing credit).
func (a *Accounting) Debit(peer swarm.Address, price uint64) error {
accountingPeer, err := a.getAccountingPeer(peer)
if err != nil {
......@@ -249,7 +304,12 @@ func (a *Accounting) Debit(peer swarm.Address, price uint64) error {
return fmt.Errorf("failed to load balance: %w", err)
}
}
nextBalance := currentBalance + int64(price)
// Get nextBalance by safely increasing current balance with price
nextBalance, err := addI64pU64(currentBalance, price)
if err != nil {
return err
}
a.logger.Tracef("debiting peer %v with price %d, new balance is %d", peer, price, nextBalance)
......@@ -270,25 +330,27 @@ func (a *Accounting) Debit(peer swarm.Address, price uint64) error {
return nil
}
// Balance returns the current balance for the given peer
// Balance returns the current balance for the given peer.
func (a *Accounting) Balance(peer swarm.Address) (balance int64, err error) {
err = a.store.Get(peerBalanceKey(peer), &balance)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return 0, ErrPeerNoBalance
}
return 0, err
}
return balance, nil
}
// get the balance storage key for the given peer
// peerBalanceKey returns the balance storage key for the given peer.
func peerBalanceKey(peer swarm.Address) string {
return fmt.Sprintf("%s%s", balancesPrefix, peer.String())
}
// getAccountingPeer gets the accountingPeer for a given swarm address
// If not in memory it will initialize it
// getAccountingPeer returns the accountingPeer for a given swarm address.
// If not found in memory it will initialize it.
func (a *Accounting) getAccountingPeer(peer swarm.Address) (*accountingPeer, error) {
a.accountingPeersMu.Lock()
defer a.accountingPeersMu.Unlock()
......@@ -304,14 +366,16 @@ func (a *Accounting) getAccountingPeer(peer swarm.Address) (*accountingPeer, err
return peerData, nil
}
// Balances gets balances for all peers from store
// Balances gets balances for all peers from store.
func (a *Accounting) Balances() (map[string]int64, error) {
s := make(map[string]int64)
err := a.store.Iterate(balancesPrefix, func(key, val []byte) (stop bool, err error) {
addr, err := balanceKeyPeer(key)
if err != nil {
return false, fmt.Errorf("parse address from key: %s: %v", string(key), err)
}
if _, ok := s[addr.String()]; !ok {
var storevalue int64
err = a.store.Get(peerBalanceKey(addr), &storevalue)
......@@ -321,15 +385,18 @@ func (a *Accounting) Balances() (map[string]int64, error) {
s[addr.String()] = storevalue
}
return false, nil
})
if err != nil {
return nil, err
}
return s, nil
}
// get the embedded peer from the balance storage key
// balanceKeyPeer returns the embedded peer from the balance storage key.
func balanceKeyPeer(key []byte) (swarm.Address, error) {
k := string(key)
......@@ -346,8 +413,8 @@ func balanceKeyPeer(key []byte) (swarm.Address, error) {
return addr, nil
}
// NotifyPayment is called by Settlement when we received payment
// Implements the PaymentObserver interface
// NotifyPayment implements the PaymentObserver interface. It is called by
// Settlement when we receive a payment.
func (a *Accounting) NotifyPayment(peer swarm.Address, amount uint64) error {
accountingPeer, err := a.getAccountingPeer(peer)
if err != nil {
......@@ -363,10 +430,15 @@ func (a *Accounting) NotifyPayment(peer swarm.Address, amount uint64) error {
return err
}
}
nextBalance := currentBalance - int64(amount)
// don't allow a payment to put use more into debt than the tolerance
// this is to prevent another node tricking us into settling by settling first (e.g. send a bouncing cheque to trigger an honest cheque in swap)
nextBalance, err := subtractI64mU64(currentBalance, amount)
if err != nil {
return err
}
// Don't allow a payment to put use more into debt than the tolerance.
// This is to prevent another node tricking us into settling by settling
// first (e.g. send a bouncing cheque to trigger an honest cheque in swap).
if nextBalance < -int64(a.paymentTolerance) {
return fmt.Errorf("refusing to accept payment which would put us too much in debt, new balance would have been %d", nextBalance)
}
......@@ -380,3 +452,46 @@ func (a *Accounting) NotifyPayment(peer swarm.Address, amount uint64) error {
return nil
}
// subtractI64mU64 is a helper function for safe subtraction of Int64 - Uint64
// It checks for
// - overflow safety in conversion of uint64 to int64
// - safety of the arithmetic
// - whether ( -1 * result ) is still Int64, as MinInt64 in absolute sense is 1 larger than MaxInt64
// If result is MinInt64, we also return overflow error, for two reasons:
// - in some cases we are going to use -1 * result in the following operations, which is secured by this check
// - we also do not want to possibly store this value as balance, even if ( -1 * result ) is not used immediately afterwards, because it could
// disable settleing for this amount as the value would create overflow
func subtractI64mU64(base int64, subtracted uint64) (result int64, err error) {
if subtracted > math.MaxInt64 {
return 0, ErrOverflow
}
result = base - int64(subtracted)
if result > base {
return 0, ErrOverflow
}
if result == math.MinInt64 {
return 0, ErrOverflow
}
return result, nil
}
// addI64pU64 is a helper function for safe addition of Int64 + Uint64
// It checks for
// - overflow safety in conversion of uint64 to int64
// - safety of the arithmetic
func addI64pU64(a int64, b uint64) (result int64, err error) {
if b > math.MaxInt64 {
return 0, ErrOverflow
}
result = a + int64(b)
if result < a {
return 0, ErrOverflow
}
return result, nil
}
......@@ -8,6 +8,7 @@ import (
"context"
"errors"
"io/ioutil"
"math"
"testing"
"github.com/ethersphere/bee/pkg/accounting"
......@@ -19,6 +20,7 @@ import (
const (
testPaymentThreshold = 10000
testPaymentThresholdLarge = math.MaxInt64 - 1
testPaymentTolerance = 1000
testPrice = uint64(10)
)
......@@ -197,7 +199,196 @@ func TestAccountingReserve(t *testing.T) {
}
}
// TestAccountingDisconnect tests that exceeding the disconnect threshold with Debit returns a p2p.BlockPeerError
func TestAccountingOverflowReserve(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
store := mock.NewStateStore()
defer store.Close()
settlement := &settlementMock{}
acc, err := accounting.NewAccounting(accounting.Options{
PaymentThreshold: testPaymentThresholdLarge,
Logger: logger,
Store: store,
Settlement: settlement,
})
if err != nil {
t.Fatal(err)
}
peer1Addr, err := swarm.ParseHexAddress("00112233")
if err != nil {
t.Fatal(err)
}
// Try crediting near maximal value for peer
err = acc.Credit(peer1Addr, math.MaxInt64-2)
if err != nil {
t.Fatal(err)
}
// Try reserving further maximal value
err = acc.Reserve(peer1Addr, math.MaxInt64)
if err != nil {
t.Fatal(err)
}
// Try reserving further value, should overflow
err = acc.Reserve(peer1Addr, 1)
if err == nil {
t.Fatal("expected error")
}
// If we had other error, assert fail
if !errors.Is(err, accounting.ErrOverflow) {
t.Fatalf("expected overflow error from Debit, got %v", err)
}
// Try reserving further near maximal value
err = acc.Reserve(peer1Addr, math.MaxInt64-2)
if err == nil {
t.Fatal("expected error")
}
// If we had other error, assert fail
if !errors.Is(err, accounting.ErrOverflow) {
t.Fatalf("expected overflow error from Debit, got %v", err)
}
}
func TestAccountingOverflowNotifyPayment(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
store := mock.NewStateStore()
defer store.Close()
settlement := &settlementMock{}
acc, err := accounting.NewAccounting(accounting.Options{
PaymentThreshold: testPaymentThresholdLarge,
Logger: logger,
Store: store,
Settlement: settlement,
})
if err != nil {
t.Fatal(err)
}
peer1Addr, err := swarm.ParseHexAddress("00112233")
if err != nil {
t.Fatal(err)
}
// Try Crediting a large amount to peer so balance is negative
err = acc.Credit(peer1Addr, math.MaxInt64)
if err != nil {
t.Fatal(err)
}
// Notify of incoming payment from same peer, further decreasing balance, this should overflow
err = acc.NotifyPayment(peer1Addr, math.MaxInt64)
if err == nil {
t.Fatal("Expected overflow from NotifyPayment")
}
// If we had other error, assert fail
if !errors.Is(err, accounting.ErrOverflow) {
t.Fatalf("expected overflow error from Debit, got %v", err)
}
}
func TestAccountingOverflowDebit(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
store := mock.NewStateStore()
defer store.Close()
acc, err := accounting.NewAccounting(accounting.Options{
PaymentThreshold: testPaymentThresholdLarge,
Logger: logger,
Store: store,
})
if err != nil {
t.Fatal(err)
}
peer1Addr, err := swarm.ParseHexAddress("00112233")
if err != nil {
t.Fatal(err)
}
// Try increasing peer debit with near maximal value
err = acc.Debit(peer1Addr, math.MaxInt64-2)
if err != nil {
t.Fatal(err)
}
// Try further increasing peer debit with near maximal value, this should fail
err = acc.Debit(peer1Addr, math.MaxInt64-2)
if err == nil {
t.Fatal("Expected error from overflow Debit")
}
// If we had other error, assert fail
if !errors.Is(err, accounting.ErrOverflow) {
t.Fatalf("expected overflow error from Credit, got %v", err)
}
// Try further increasing peer debit with near maximal value, this should fail
err = acc.Debit(peer1Addr, math.MaxInt64)
if err == nil {
t.Fatal("Expected error from overflow Debit")
}
// If we had other error, assert fail
if !errors.Is(err, accounting.ErrOverflow) {
t.Fatalf("expected overflow error from Debit, got %v", err)
}
}
func TestAccountingOverflowCredit(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
store := mock.NewStateStore()
defer store.Close()
acc, err := accounting.NewAccounting(accounting.Options{
PaymentThreshold: testPaymentThresholdLarge,
Logger: logger,
Store: store,
})
if err != nil {
t.Fatal(err)
}
peer1Addr, err := swarm.ParseHexAddress("00112233")
if err != nil {
t.Fatal(err)
}
// Try increasing peer credit with near maximal value
err = acc.Credit(peer1Addr, math.MaxInt64-2)
if err != nil {
t.Fatal(err)
}
// Try increasing with a further near maximal value, this must overflow
err = acc.Credit(peer1Addr, math.MaxInt64-2)
if err == nil {
t.Fatal("Expected error from overflow Credit")
}
// Try increasing with a small amount, which should also overflow
err = acc.Credit(peer1Addr, 3)
if err == nil {
t.Fatal("Expected error from overflow Credit")
}
// If we had other error, assert fail
if !errors.Is(err, accounting.ErrOverflow) {
t.Fatalf("expected overflow error from Credit, got %v", err)
}
// Try increasing with maximal value
err = acc.Credit(peer1Addr, math.MaxInt64)
if err == nil {
t.Fatal("Expected error from overflow Credit")
}
// If we had other error, assert fail
if !errors.Is(err, accounting.ErrOverflow) {
t.Fatalf("expected overflow error from Credit, got %v", err)
}
}
// TestAccountingDisconnect tests that exceeding the disconnect threshold with Debit returns a p2p.DisconnectError
func TestAccountingDisconnect(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
......
......@@ -64,6 +64,7 @@ func newMetrics() metrics {
}
}
// Metrics returns the prometheus Collector for the accounting service.
func (a *Accounting) Metrics() []prometheus.Collector {
return m.PrometheusCollectorsFromFields(a.metrics)
}
......@@ -8,18 +8,21 @@ import (
"github.com/ethersphere/bee/pkg/swarm"
)
// Pricer returns pricing information for chunk hashes.
type Pricer interface {
// PeerPrice is the price the peer charges for a given chunk hash
// PeerPrice is the price the peer charges for a given chunk hash.
PeerPrice(peer, chunk swarm.Address) uint64
// Price is the price we charge for a given chunk hash
// Price is the price we charge for a given chunk hash.
Price(chunk swarm.Address) uint64
}
// FixedPricer is a Pricer that has a fixed price for chunks.
type FixedPricer struct {
overlay swarm.Address
poPrice uint64
}
// NewFixedPricer returns a new FixedPricer with a given price.
func NewFixedPricer(overlay swarm.Address, poPrice uint64) *FixedPricer {
return &FixedPricer{
overlay: overlay,
......@@ -27,10 +30,12 @@ func NewFixedPricer(overlay swarm.Address, poPrice uint64) *FixedPricer {
}
}
// PeerPrice implements Pricer.
func (pricer *FixedPricer) PeerPrice(peer, chunk swarm.Address) uint64 {
return uint64(swarm.MaxPO-swarm.Proximity(peer.Bytes(), chunk.Bytes())) * pricer.poPrice
}
// Price implements Pricer.
func (pricer *FixedPricer) Price(chunk swarm.Address) uint64 {
return pricer.PeerPrice(pricer.overlay, chunk)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment