Commit 526dce5e authored by Ralph Pichler's avatar Ralph Pichler Committed by GitHub

Always load balance from store (#580)

parent 029cf9b7
...@@ -41,11 +41,10 @@ type Interface interface { ...@@ -41,11 +41,10 @@ type Interface interface {
Balances() (map[string]int64, error) Balances() (map[string]int64, error)
} }
// PeerBalance holds all relevant accounting information for one peer // accountingPeer holds all in-memory accounting information for one peer
type PeerBalance struct { type accountingPeer struct {
lock sync.Mutex lock sync.Mutex // lock to be held during any accounting action for this peer
balance int64 // amount that the peer owes us if positive, our debt if negative reservedBalance uint64 // amount currently reserved for active peer interaction
reserved uint64 // amount currently reserved for active peer interaction
} }
// Options for accounting // Options for accounting
...@@ -59,14 +58,14 @@ type Options struct { ...@@ -59,14 +58,14 @@ type Options struct {
// Accounting is the main implementation of the accounting interface // Accounting is the main implementation of the accounting interface
type Accounting struct { type Accounting struct {
balancesMu sync.Mutex // mutex for accessing the balances map accountingPeersMu sync.Mutex // mutex for accessing the accountingPeers map
balances map[string]*PeerBalance accountingPeers map[string]*accountingPeer
logger logging.Logger logger logging.Logger
store storage.StateStorer store storage.StateStorer
paymentThreshold uint64 // the payment threshold in BZZ we communicate to our peers paymentThreshold uint64 // the payment threshold in BZZ we communicate to our peers
paymentTolerance uint64 // the amount in BZZ we let peers exceed the payment threshold before disconnected paymentTolerance uint64 // the amount in BZZ we let peers exceed the payment threshold before disconnected
settlement settlement.Interface settlement settlement.Interface
metrics metrics metrics metrics
} }
var ( var (
...@@ -85,7 +84,7 @@ func NewAccounting(o Options) (*Accounting, error) { ...@@ -85,7 +84,7 @@ func NewAccounting(o Options) (*Accounting, error) {
} }
return &Accounting{ return &Accounting{
balances: make(map[string]*PeerBalance), accountingPeers: make(map[string]*accountingPeer),
paymentThreshold: o.PaymentThreshold, paymentThreshold: o.PaymentThreshold,
paymentTolerance: o.PaymentTolerance, paymentTolerance: o.PaymentTolerance,
logger: o.Logger, logger: o.Logger,
...@@ -97,73 +96,90 @@ func NewAccounting(o Options) (*Accounting, error) { ...@@ -97,73 +96,90 @@ func NewAccounting(o Options) (*Accounting, error) {
// Reserve reserves a portion of the balance for peer // Reserve reserves a portion of the balance for peer
func (a *Accounting) Reserve(peer swarm.Address, price uint64) error { func (a *Accounting) Reserve(peer swarm.Address, price uint64) error {
balance, err := a.getPeerBalance(peer) accountingPeer, err := a.getAccountingPeer(peer)
if err != nil { if err != nil {
return err return err
} }
balance.lock.Lock() accountingPeer.lock.Lock()
defer balance.lock.Unlock() defer accountingPeer.lock.Unlock()
currentBalance, err := a.Balance(peer)
if err != nil {
return fmt.Errorf("failed to load balance: %w", err)
}
expectedDebt := -(currentBalance - int64(accountingPeer.reservedBalance))
if expectedDebt < 0 {
expectedDebt = 0
}
// check if the expected debt is already over the payment threshold // check if the expected debt is already over the payment threshold
if balance.expectedDebt() > a.paymentThreshold { if uint64(expectedDebt) > a.paymentThreshold {
a.metrics.AccountingBlocksCount.Inc() a.metrics.AccountingBlocksCount.Inc()
return ErrOverdraft return ErrOverdraft
} }
balance.reserved += price accountingPeer.reservedBalance += price
return nil return nil
} }
// Release releases reserved funds // Release releases reserved funds
func (a *Accounting) Release(peer swarm.Address, price uint64) { func (a *Accounting) Release(peer swarm.Address, price uint64) {
balance, err := a.getPeerBalance(peer) accountingPeer, err := a.getAccountingPeer(peer)
if err != nil { if err != nil {
a.logger.Errorf("cannot release balance for peer: %v", err) a.logger.Errorf("cannot release balance for peer: %v", err)
return return
} }
balance.lock.Lock() accountingPeer.lock.Lock()
defer balance.lock.Unlock() defer accountingPeer.lock.Unlock()
if price > balance.reserved { if price > accountingPeer.reservedBalance {
// If Reserve and Release calls are always paired this should never happen // If Reserve and Release calls are always paired this should never happen
a.logger.Error("attempting to release more balance than was reserved for peer") a.logger.Error("attempting to release more balance than was reserved for peer")
balance.reserved = 0 accountingPeer.reservedBalance = 0
} else { } else {
balance.reserved -= price accountingPeer.reservedBalance -= price
} }
} }
// Credit increases the amount of credit we have with the given peer (and decreases existing debt). // Credit increases the amount of credit we have with the given peer (and decreases existing debt).
func (a *Accounting) Credit(peer swarm.Address, price uint64) error { func (a *Accounting) Credit(peer swarm.Address, price uint64) error {
balance, err := a.getPeerBalance(peer) accountingPeer, err := a.getAccountingPeer(peer)
if err != nil { if err != nil {
return err return err
} }
balance.lock.Lock() accountingPeer.lock.Lock()
defer balance.lock.Unlock() defer accountingPeer.lock.Unlock()
nextBalance := balance.balance - int64(price) currentBalance, err := a.Balance(peer)
if err != nil {
return fmt.Errorf("failed to load balance: %w", err)
}
nextBalance := currentBalance - int64(price)
a.logger.Tracef("crediting peer %v with price %d, new balance is %d", peer, price, nextBalance) a.logger.Tracef("crediting peer %v with price %d, new balance is %d", peer, price, nextBalance)
// compute expected debt before update because reserve still includes the amount that is deducted from the balance // compute expected debt before update because reserve still includes the amount that is deducted from the balance
expectedDebt := balance.expectedDebt() expectedDebt := -(currentBalance - int64(accountingPeer.reservedBalance))
if expectedDebt < 0 {
expectedDebt = 0
}
err = a.store.Put(peerBalanceKey(peer), nextBalance) err = a.store.Put(peerBalanceKey(peer), nextBalance)
if err != nil { if err != nil {
return fmt.Errorf("failed to persist balance: %w", err) return fmt.Errorf("failed to persist balance: %w", err)
} }
balance.balance = nextBalance
a.metrics.TotalCreditedAmount.Add(float64(price)) a.metrics.TotalCreditedAmount.Add(float64(price))
a.metrics.CreditEventsCount.Inc() a.metrics.CreditEventsCount.Inc()
// if our expected debt exceeds our payment threshold (which we assume is also the peers payment threshold), trigger settlement // if our expected debt exceeds our payment threshold (which we assume is also the peers payment threshold), trigger settlement
if expectedDebt >= a.paymentThreshold { if uint64(expectedDebt) >= a.paymentThreshold {
err = a.settle(peer, balance) err = a.settle(peer, accountingPeer)
if err != nil { if err != nil {
a.logger.Errorf("failed to settle with peer %v: %v", peer, err) a.logger.Errorf("failed to settle with peer %v: %v", peer, err)
} }
...@@ -173,21 +189,25 @@ func (a *Accounting) Credit(peer swarm.Address, price uint64) error { ...@@ -173,21 +189,25 @@ func (a *Accounting) Credit(peer swarm.Address, price uint64) error {
} }
// settle all debt with a peer // settle all debt with a peer
// the lock on balance must be held when called // the lock on the accountingPeer must be held when called
func (a *Accounting) settle(peer swarm.Address, balance *PeerBalance) error { func (a *Accounting) settle(peer swarm.Address, balance *accountingPeer) error {
oldBalance, err := a.Balance(peer)
if err != nil {
return fmt.Errorf("failed to load balance: %w", err)
}
// don't do anything if there is no actual debt // don't do anything if there is no actual debt
// this might be the case if the peer owes us and the total reserve for a peer exceeds the payment treshhold // this might be the case if the peer owes us and the total reserve for a peer exceeds the payment treshhold
if balance.balance >= 0 { if oldBalance >= 0 {
return nil return nil
} }
paymentAmount := uint64(-balance.balance)
oldBalance := balance.balance paymentAmount := uint64(-oldBalance)
nextBalance := oldBalance + int64(paymentAmount) nextBalance := oldBalance + int64(paymentAmount)
// try to save the next balance first // try to save the next balance first
// otherwise we might pay and then not be able to save, thus paying again after restart // otherwise we might pay and then not be able to save, thus paying again after restart
err := a.store.Put(peerBalanceKey(peer), nextBalance) err = a.store.Put(peerBalanceKey(peer), nextBalance)
if err != nil { if err != nil {
return fmt.Errorf("failed to persist balance: %w", err) return fmt.Errorf("failed to persist balance: %w", err)
} }
...@@ -201,21 +221,24 @@ func (a *Accounting) settle(peer swarm.Address, balance *PeerBalance) error { ...@@ -201,21 +221,24 @@ func (a *Accounting) settle(peer swarm.Address, balance *PeerBalance) error {
} }
return err return err
} }
balance.balance = nextBalance
return nil return nil
} }
// Debit increases the amount of debt we have with the given peer (and decreases existing credit) // Debit increases the amount of debt we have with the given peer (and decreases existing credit)
func (a *Accounting) Debit(peer swarm.Address, price uint64) error { func (a *Accounting) Debit(peer swarm.Address, price uint64) error {
balance, err := a.getPeerBalance(peer) accountingPeer, err := a.getAccountingPeer(peer)
if err != nil { if err != nil {
return err return err
} }
balance.lock.Lock() accountingPeer.lock.Lock()
defer balance.lock.Unlock() defer accountingPeer.lock.Unlock()
nextBalance := balance.balance + int64(price) currentBalance, err := a.Balance(peer)
if err != nil {
return fmt.Errorf("failed to load balance: %w", err)
}
nextBalance := currentBalance + int64(price)
a.logger.Tracef("debiting peer %v with price %d, new balance is %d", peer, price, nextBalance) a.logger.Tracef("debiting peer %v with price %d, new balance is %d", peer, price, nextBalance)
...@@ -224,8 +247,6 @@ func (a *Accounting) Debit(peer swarm.Address, price uint64) error { ...@@ -224,8 +247,6 @@ func (a *Accounting) Debit(peer swarm.Address, price uint64) error {
return fmt.Errorf("failed to persist balance: %w", err) return fmt.Errorf("failed to persist balance: %w", err)
} }
balance.balance = nextBalance
a.metrics.TotalDebitedAmount.Add(float64(price)) a.metrics.TotalDebitedAmount.Add(float64(price))
a.metrics.DebitEventsCount.Inc() a.metrics.DebitEventsCount.Inc()
...@@ -239,12 +260,15 @@ func (a *Accounting) Debit(peer swarm.Address, price uint64) error { ...@@ -239,12 +260,15 @@ func (a *Accounting) Debit(peer swarm.Address, price uint64) error {
} }
// Balance returns the current balance for the given peer // Balance returns the current balance for the given peer
func (a *Accounting) Balance(peer swarm.Address) (int64, error) { func (a *Accounting) Balance(peer swarm.Address) (balance int64, err error) {
peerBalance, err := a.getPeerBalance(peer) err = a.store.Get(peerBalanceKey(peer), &balance)
if err != nil { if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return 0, nil
}
return 0, err return 0, err
} }
return peerBalance.balance, nil return balance, nil
} }
// get the balance storage key for the given peer // get the balance storage key for the given peer
...@@ -252,63 +276,27 @@ func peerBalanceKey(peer swarm.Address) string { ...@@ -252,63 +276,27 @@ func peerBalanceKey(peer swarm.Address) string {
return fmt.Sprintf("%s%s", balancesPrefix, peer.String()) return fmt.Sprintf("%s%s", balancesPrefix, peer.String())
} }
// getPeerBalance gets the PeerBalance for a given peer // getAccountingPeer gets the accountingPeer for a given swarm address
// If not in memory it will try to load it from the state store // If not in memory it will initialize it
// if not found it will initialise it with 0 balance func (a *Accounting) getAccountingPeer(peer swarm.Address) (*accountingPeer, error) {
func (a *Accounting) getPeerBalance(peer swarm.Address) (*PeerBalance, error) { a.accountingPeersMu.Lock()
a.balancesMu.Lock() defer a.accountingPeersMu.Unlock()
defer a.balancesMu.Unlock()
peerBalance, ok := a.balances[peer.String()] peerData, ok := a.accountingPeers[peer.String()]
if !ok { if !ok {
// balance not yet in memory, load from state store peerData = &accountingPeer{
var balance int64 reservedBalance: 0,
err := a.store.Get(peerBalanceKey(peer), &balance)
if err == nil {
peerBalance = &PeerBalance{
balance: balance,
reserved: 0,
}
} else if err == storage.ErrNotFound {
// no prior records in state store
peerBalance = &PeerBalance{
balance: 0,
reserved: 0,
}
} else {
// other error in state store
return nil, err
} }
a.accountingPeers[peer.String()] = peerData
a.balances[peer.String()] = peerBalance
} }
return peerBalance, nil return peerData, nil
} }
// Balances gets balances for all peers, first from memory, than completing from store // Balances gets balances for all peers from store
func (a *Accounting) Balances() (map[string]int64, error) { func (a *Accounting) Balances() (map[string]int64, error) {
peersBalances := make(map[string]int64) s := make(map[string]int64)
err := a.store.Iterate(balancesPrefix, func(key, val []byte) (stop bool, err error) {
// get peer balances from store first as it may be outdated
// compared to the in memory map
if err := a.balancesFromStore(peersBalances); err != nil {
return nil, err
}
a.balancesMu.Lock()
for peer, balance := range a.balances {
peersBalances[peer] = balance.balance
}
a.balancesMu.Unlock()
return peersBalances, nil
}
// Get balances from store for keys (peers) that do not already exist in argument map.
// Used to get all balances not loaded in memory at the time the Balances() function is called.
func (a *Accounting) balancesFromStore(s map[string]int64) error {
return a.store.Iterate(balancesPrefix, func(key, val []byte) (stop bool, err error) {
addr, err := balanceKeyPeer(key) addr, err := balanceKeyPeer(key)
if err != nil { if err != nil {
return false, fmt.Errorf("parse address from key: %s: %v", string(key), err) return false, fmt.Errorf("parse address from key: %s: %v", string(key), err)
...@@ -324,6 +312,10 @@ func (a *Accounting) balancesFromStore(s map[string]int64) error { ...@@ -324,6 +312,10 @@ func (a *Accounting) balancesFromStore(s map[string]int64) error {
} }
return false, nil return false, nil
}) })
if err != nil {
return nil, err
}
return s, nil
} }
// get the embedded peer from the balance storage key // get the embedded peer from the balance storage key
...@@ -343,32 +335,22 @@ func balanceKeyPeer(key []byte) (swarm.Address, error) { ...@@ -343,32 +335,22 @@ func balanceKeyPeer(key []byte) (swarm.Address, error) {
return addr, nil return addr, nil
} }
// expectedBalance returns the balance we expect to have with a peer if all reserved funds are actually credited
func (pb *PeerBalance) expectedBalance() int64 {
return pb.balance - int64(pb.reserved)
}
// expectedDebt returns the debt we expect to have with a peer if all reserved funds are actually credited
func (pb *PeerBalance) expectedDebt() uint64 {
expectedBalance := pb.expectedBalance()
if expectedBalance >= 0 {
return 0
}
return uint64(-expectedBalance)
}
// NotifyPayment is called by Settlement when we received payment // NotifyPayment is called by Settlement when we received payment
// Implements the PaymentObserver interface // Implements the PaymentObserver interface
func (a *Accounting) NotifyPayment(peer swarm.Address, amount uint64) error { func (a *Accounting) NotifyPayment(peer swarm.Address, amount uint64) error {
balance, err := a.getPeerBalance(peer) accountingPeer, err := a.getAccountingPeer(peer)
if err != nil { if err != nil {
return err return err
} }
balance.lock.Lock() accountingPeer.lock.Lock()
defer balance.lock.Unlock() defer accountingPeer.lock.Unlock()
nextBalance := balance.balance - int64(amount) currentBalance, err := a.Balance(peer)
if err != nil {
return err
}
nextBalance := currentBalance - int64(amount)
// don't allow a payment to put use more into debt than the tolerance // don't allow a payment to put use more into debt than the tolerance
// this is to prevent another node tricking us into settling by settling first (e.g. send a bouncing cheque to trigger an honest cheque in swap) // this is to prevent another node tricking us into settling by settling first (e.g. send a bouncing cheque to trigger an honest cheque in swap)
...@@ -383,7 +365,5 @@ func (a *Accounting) NotifyPayment(peer swarm.Address, amount uint64) error { ...@@ -383,7 +365,5 @@ func (a *Accounting) NotifyPayment(peer swarm.Address, amount uint64) error {
return fmt.Errorf("failed to persist balance: %w", err) return fmt.Errorf("failed to persist balance: %w", err)
} }
balance.balance = nextBalance
return nil return nil
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment