Commit c9ff1221 authored by Ralph Pichler's avatar Ralph Pichler Committed by GitHub

accounting: settle in reserve (#791)

parent 4fd07fd7
...@@ -28,12 +28,12 @@ var ( ...@@ -28,12 +28,12 @@ var (
// Interface is the Accounting interface. // Interface is the Accounting interface.
type Interface interface { type Interface interface {
// Reserve reserves a portion of the balance for peer. Returns an error if // Reserve reserves a portion of the balance for peer and attempts settlements if necessary.
// the operation risks exceeding the disconnect threshold. // Returns an error if the operation risks exceeding the disconnect threshold or an attempted settlement failed.
// //
// This should be called (always in combination with Release) before a // This has to be called (always in combination with Release) before a
// Credit action to prevent overspending in case of concurrent requests. // Credit action to prevent overspending in case of concurrent requests.
Reserve(peer swarm.Address, price uint64) error Reserve(ctx context.Context, peer swarm.Address, price uint64) error
// Release releases the reserved funds. // Release releases the reserved funds.
Release(peer swarm.Address, price uint64) Release(peer swarm.Address, price uint64)
// Credit increases the balance the peer has with us (we "pay" the peer). // Credit increases the balance the peer has with us (we "pay" the peer).
...@@ -116,8 +116,8 @@ func NewAccounting( ...@@ -116,8 +116,8 @@ func NewAccounting(
}, nil }, nil
} }
// Reserve reserves a portion of the balance for peer. // Reserve reserves a portion of the balance for peer and attempts settlements if necessary.
func (a *Accounting) Reserve(peer swarm.Address, price uint64) error { func (a *Accounting) Reserve(ctx context.Context, peer swarm.Address, price uint64) error {
accountingPeer, err := a.getAccountingPeer(peer) accountingPeer, err := a.getAccountingPeer(peer)
if err != nil { if err != nil {
return err return err
...@@ -133,32 +133,54 @@ func (a *Accounting) Reserve(peer swarm.Address, price uint64) error { ...@@ -133,32 +133,54 @@ func (a *Accounting) Reserve(peer swarm.Address, price uint64) error {
} }
} }
// Check for safety of increase of reservedBalance by price
if accountingPeer.reservedBalance+price < accountingPeer.reservedBalance {
return ErrOverflow
}
nextReserved := accountingPeer.reservedBalance + price
// Subtract already reserved amount from actual balance, to get expected balance // Subtract already reserved amount from actual balance, to get expected balance
expectedBalance, err := subtractI64mU64(currentBalance, accountingPeer.reservedBalance) expectedBalance, err := subtractI64mU64(currentBalance, nextReserved)
if err != nil { if err != nil {
return err return err
} }
// Determine if we owe anything to the peer, if we owe less than 0, we conclude we owe nothing // Determine if we will owe anything to the peer, if we owe less than 0, we conclude we owe nothing
// This conversion is made safe by previous subtractI64mU64 not allowing MinInt64 // This conversion is made safe by previous subtractI64mU64 not allowing MinInt64
expectedDebt := -expectedBalance expectedDebt := -expectedBalance
if expectedDebt < 0 { if expectedDebt < 0 {
expectedDebt = 0 expectedDebt = 0
} }
// Check if the expected debt is already over the payment threshold. threshold := accountingPeer.paymentThreshold
if uint64(expectedDebt) > a.paymentThreshold { if threshold > a.earlyPayment {
a.metrics.AccountingBlocksCount.Inc() threshold -= a.earlyPayment
return ErrOverdraft } else {
threshold = 0
} }
// Check for safety of increase of reservedBalance by price // If our expected debt is less than earlyPayment away from our payment threshold
if accountingPeer.reservedBalance+price < accountingPeer.reservedBalance { // and we are actually in debt, trigger settlement.
return ErrOverflow // we pay early to avoid needlessly blocking request later when concurrent requests occur and we are already close to the payment threshold.
if expectedDebt >= int64(threshold) && currentBalance < 0 {
err = a.settle(ctx, peer, accountingPeer)
if err != nil {
return fmt.Errorf("failed to settle with peer %v: %v", peer, err)
}
// if we settled successfully our balance is back at 0
// and the expected debt therefore equals next reserved amount
expectedDebt = int64(nextReserved)
} }
accountingPeer.reservedBalance += price // if expectedDebt would still exceed the paymentThreshold at this point block this request
// this can happen if there is a large number of concurrent requests to the same peer
if expectedDebt > int64(a.paymentThreshold) {
a.metrics.AccountingBlocksCount.Inc()
return ErrOverdraft
}
accountingPeer.reservedBalance = nextReserved
return nil return nil
} }
...@@ -208,20 +230,6 @@ func (a *Accounting) Credit(peer swarm.Address, price uint64) error { ...@@ -208,20 +230,6 @@ func (a *Accounting) Credit(peer swarm.Address, price uint64) error {
a.logger.Tracef("crediting peer %v with price %d, new balance is %d", peer, price, nextBalance) a.logger.Tracef("crediting peer %v with price %d, new balance is %d", peer, price, nextBalance)
// Get expectedbalance by safely decreasing current balance with reserved amounts
expectedBalance, err := subtractI64mU64(currentBalance, accountingPeer.reservedBalance)
if err != nil {
return err
}
// Compute expected debt before update because reserve still includes the
// amount that is deducted from the balance.
// This conversion is made safe by previous subtractI64mU64 not allowing MinInt64
expectedDebt := -expectedBalance
if expectedDebt < 0 {
expectedDebt = 0
}
err = a.store.Put(peerBalanceKey(peer), nextBalance) err = a.store.Put(peerBalanceKey(peer), nextBalance)
if err != nil { if err != nil {
return fmt.Errorf("failed to persist balance: %w", err) return fmt.Errorf("failed to persist balance: %w", err)
...@@ -229,30 +237,12 @@ func (a *Accounting) Credit(peer swarm.Address, price uint64) error { ...@@ -229,30 +237,12 @@ func (a *Accounting) Credit(peer swarm.Address, price uint64) error {
a.metrics.TotalCreditedAmount.Add(float64(price)) a.metrics.TotalCreditedAmount.Add(float64(price))
a.metrics.CreditEventsCount.Inc() a.metrics.CreditEventsCount.Inc()
// If our expected debt is less than earlyPayment away from our payment threshold (which we assume is
// also the peers payment threshold), trigger settlement.
// we pay early to avoid needlessly blocking request later when concurrent requests occur and we are already close to the payment threshold
threshold := accountingPeer.paymentThreshold
if threshold > a.earlyPayment {
threshold -= a.earlyPayment
} else {
threshold = 0
}
if uint64(expectedDebt) >= threshold {
err = a.settle(peer, accountingPeer)
if err != nil {
a.logger.Errorf("failed to settle with peer %v: %v", peer, err)
}
}
return nil return nil
} }
// Settle all debt with a peer. The lock on the accountingPeer must be held when // Settle all debt with a peer. The lock on the accountingPeer must be held when
// called. // called.
func (a *Accounting) settle(peer swarm.Address, balance *accountingPeer) error { func (a *Accounting) settle(ctx context.Context, peer swarm.Address, balance *accountingPeer) error {
oldBalance, err := a.Balance(peer) oldBalance, err := a.Balance(peer)
if err != nil { if err != nil {
if !errors.Is(err, ErrPeerNoBalance) { if !errors.Is(err, ErrPeerNoBalance) {
...@@ -284,7 +274,7 @@ func (a *Accounting) settle(peer swarm.Address, balance *accountingPeer) error { ...@@ -284,7 +274,7 @@ func (a *Accounting) settle(peer swarm.Address, balance *accountingPeer) error {
return fmt.Errorf("failed to persist balance: %w", err) return fmt.Errorf("failed to persist balance: %w", err)
} }
err = a.settlement.Pay(context.Background(), peer, paymentAmount) err = a.settlement.Pay(ctx, peer, paymentAmount)
if err != nil { if err != nil {
err = fmt.Errorf("settlement for amount %d failed: %w", paymentAmount, err) err = fmt.Errorf("settlement for amount %d failed: %w", paymentAmount, err)
// If the payment didn't succeed we should restore the old balance in // If the payment didn't succeed we should restore the old balance in
......
This diff is collapsed.
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
package mock package mock
import ( import (
"context"
"sync" "sync"
"github.com/ethersphere/bee/pkg/accounting" "github.com/ethersphere/bee/pkg/accounting"
...@@ -15,7 +16,7 @@ import ( ...@@ -15,7 +16,7 @@ import (
type Service struct { type Service struct {
lock sync.Mutex lock sync.Mutex
balances map[string]int64 balances map[string]int64
reserveFunc func(peer swarm.Address, price uint64) error reserveFunc func(ctx context.Context, peer swarm.Address, price uint64) error
releaseFunc func(peer swarm.Address, price uint64) releaseFunc func(peer swarm.Address, price uint64)
creditFunc func(peer swarm.Address, price uint64) error creditFunc func(peer swarm.Address, price uint64) error
debitFunc func(peer swarm.Address, price uint64) error debitFunc func(peer swarm.Address, price uint64) error
...@@ -24,7 +25,7 @@ type Service struct { ...@@ -24,7 +25,7 @@ type Service struct {
} }
// WithReserveFunc sets the mock Reserve function // WithReserveFunc sets the mock Reserve function
func WithReserveFunc(f func(peer swarm.Address, price uint64) error) Option { func WithReserveFunc(f func(ctx context.Context, peer swarm.Address, price uint64) error) Option {
return optionFunc(func(s *Service) { return optionFunc(func(s *Service) {
s.reserveFunc = f s.reserveFunc = f
}) })
...@@ -76,9 +77,9 @@ func NewAccounting(opts ...Option) accounting.Interface { ...@@ -76,9 +77,9 @@ func NewAccounting(opts ...Option) accounting.Interface {
} }
// Reserve is the mock function wrapper that calls the set implementation // Reserve is the mock function wrapper that calls the set implementation
func (s *Service) Reserve(peer swarm.Address, price uint64) error { func (s *Service) Reserve(ctx context.Context, peer swarm.Address, price uint64) error {
if s.reserveFunc != nil { if s.reserveFunc != nil {
return s.reserveFunc(peer, price) return s.reserveFunc(ctx, peer, price)
} }
return nil return nil
} }
......
...@@ -122,7 +122,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) ...@@ -122,7 +122,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
// compute the price we pay for this receipt and reserve it for the rest of this function // compute the price we pay for this receipt and reserve it for the rest of this function
receiptPrice := ps.pricer.PeerPrice(peer, chunk.Address()) receiptPrice := ps.pricer.PeerPrice(peer, chunk.Address())
err = ps.accounting.Reserve(peer, receiptPrice) err = ps.accounting.Reserve(ctx, peer, receiptPrice)
if err != nil { if err != nil {
return fmt.Errorf("reserve balance for peer %s: %w", peer.String(), err) return fmt.Errorf("reserve balance for peer %s: %w", peer.String(), err)
} }
...@@ -235,7 +235,7 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re ...@@ -235,7 +235,7 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
// compute the price we pay for this receipt and reserve it for the rest of this function // compute the price we pay for this receipt and reserve it for the rest of this function
receiptPrice := ps.pricer.PeerPrice(peer, ch.Address()) receiptPrice := ps.pricer.PeerPrice(peer, ch.Address())
err = ps.accounting.Reserve(peer, receiptPrice) err = ps.accounting.Reserve(ctx, peer, receiptPrice)
if err != nil { if err != nil {
return nil, fmt.Errorf("reserve balance for peer %s: %w", peer.String(), err) return nil, fmt.Errorf("reserve balance for peer %s: %w", peer.String(), err)
} }
......
...@@ -138,7 +138,7 @@ func (s *Service) retrieveChunk(ctx context.Context, addr swarm.Address, skipPee ...@@ -138,7 +138,7 @@ func (s *Service) retrieveChunk(ctx context.Context, addr swarm.Address, skipPee
// compute the price we pay for this chunk and reserve it for the rest of this function // compute the price we pay for this chunk and reserve it for the rest of this function
chunkPrice := s.pricer.PeerPrice(peer, addr) chunkPrice := s.pricer.PeerPrice(peer, addr)
err = s.accounting.Reserve(peer, chunkPrice) err = s.accounting.Reserve(ctx, peer, chunkPrice)
if err != nil { if err != nil {
return nil, peer, err return nil, peer, err
} }
......
...@@ -23,6 +23,8 @@ type Service struct { ...@@ -23,6 +23,8 @@ type Service struct {
settlementsSentFunc func() (map[string]uint64, error) settlementsSentFunc func() (map[string]uint64, error)
settlementsRecvFunc func() (map[string]uint64, error) settlementsRecvFunc func() (map[string]uint64, error)
payFunc func(context.Context, swarm.Address, uint64) error
} }
// WithsettlementFunc sets the mock settlement function // WithsettlementFunc sets the mock settlement function
...@@ -51,6 +53,12 @@ func WithSettlementsRecvFunc(f func() (map[string]uint64, error)) Option { ...@@ -51,6 +53,12 @@ func WithSettlementsRecvFunc(f func() (map[string]uint64, error)) Option {
}) })
} }
func WithPayFunc(f func(context.Context, swarm.Address, uint64) error) Option {
return optionFunc(func(s *Service) {
s.payFunc = f
})
}
// Newsettlement creates the mock settlement implementation // Newsettlement creates the mock settlement implementation
func NewSettlement(opts ...Option) settlement.Interface { func NewSettlement(opts ...Option) settlement.Interface {
mock := new(Service) mock := new(Service)
...@@ -62,7 +70,10 @@ func NewSettlement(opts ...Option) settlement.Interface { ...@@ -62,7 +70,10 @@ func NewSettlement(opts ...Option) settlement.Interface {
return mock return mock
} }
func (s *Service) Pay(_ context.Context, peer swarm.Address, amount uint64) error { func (s *Service) Pay(c context.Context, peer swarm.Address, amount uint64) error {
if s.payFunc != nil {
return s.payFunc(c, peer, amount)
}
s.settlementsSent[peer.String()] += amount s.settlementsSent[peer.String()] += amount
return nil return nil
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment