Commit 050102b9 authored by metacertain's avatar metacertain Committed by GitHub

feat: maintain originated balance (#1870)

parent b1c98fdc
......@@ -23,9 +23,10 @@ import (
)
var (
_ Interface = (*Accounting)(nil)
balancesPrefix string = "accounting_balance_"
balancesSurplusPrefix string = "accounting_surplusbalance_"
_ Interface = (*Accounting)(nil)
balancesPrefix = "accounting_balance_"
balancesSurplusPrefix = "accounting_surplusbalance_"
balancesOriginatedPrefix = "accounting_originatedbalance_"
// fraction of the refresh rate that is the minimum for monetary settlement
// this value is chosen so that tiny payments are prevented while still allowing small payments in environments with lower payment thresholds
minimumPaymentDivisor = int64(5)
......@@ -42,7 +43,7 @@ type Interface interface {
// Release releases the reserved funds.
Release(peer swarm.Address, price uint64)
// Credit increases the balance the peer has with us (we "pay" the peer).
Credit(peer swarm.Address, price uint64) error
Credit(peer swarm.Address, price uint64, originated bool) error
// PrepareDebit returns an accounting Action for the later debit to be executed on and to implement shadowing a possibly credited part of reserve on the other side.
PrepareDebit(peer swarm.Address, price uint64) Action
// Balance returns the current balance for the given peer.
......@@ -242,7 +243,7 @@ func (a *Accounting) Release(peer swarm.Address, price uint64) {
// Credit increases the amount of credit we have with the given peer
// (and decreases existing debt).
func (a *Accounting) Credit(peer swarm.Address, price uint64) error {
func (a *Accounting) Credit(peer swarm.Address, price uint64, originated bool) error {
accountingPeer := a.getAccountingPeer(peer)
accountingPeer.lock.Lock()
......@@ -267,6 +268,41 @@ func (a *Accounting) Credit(peer swarm.Address, price uint64) error {
a.metrics.TotalCreditedAmount.Add(float64(price))
a.metrics.CreditEventsCount.Inc()
if !originated {
return nil
}
originBalance, err := a.OriginatedBalance(peer)
if err != nil && !errors.Is(err, ErrPeerNoBalance) {
return fmt.Errorf("failed to load originated balance: %w", err)
}
// Calculate next balance by decreasing current balance with the price we credit
nextOriginBalance := new(big.Int).Sub(originBalance, new(big.Int).SetUint64(price))
a.logger.Tracef("crediting peer %v with price %d, new originated balance is %d", peer, price, nextOriginBalance)
zero := big.NewInt(0)
// only consider negative balance for limiting originated balance
if nextBalance.Cmp(zero) > 0 {
nextBalance.Set(zero)
}
// If originated balance is more into the negative domain, set it to balance
if nextOriginBalance.Cmp(nextBalance) < 0 {
nextOriginBalance.Set(nextBalance)
a.logger.Tracef("decreasing originated balance to peer %v to current balance %d", peer, nextOriginBalance)
}
err = a.store.Put(originatedBalanceKey(peer), nextOriginBalance)
if err != nil {
return fmt.Errorf("failed to persist originated balance: %w", err)
}
a.metrics.TotalOriginatedCreditedAmount.Add(float64(price))
a.metrics.OriginatedCreditEventsCount.Inc()
return nil
}
......@@ -314,12 +350,24 @@ func (a *Accounting) settle(peer swarm.Address, balance *accountingPeer) error {
if err != nil {
return fmt.Errorf("settle: failed to persist balance: %w", err)
}
err = a.decreaseOriginatedBalanceTo(peer, oldBalance)
if err != nil {
return fmt.Errorf("settle: failed to decrease originated balance: %w", err)
}
}
if a.payFunction != nil && !balance.paymentOngoing {
// if there is no monetary settlement happening, check if there is something to settle
// compute debt excluding debt created by incoming payments
paymentAmount := new(big.Int).Neg(oldBalance)
originatedBalance, err := a.OriginatedBalance(peer)
if err != nil {
if !errors.Is(err, ErrPeerNoBalance) {
return fmt.Errorf("failed to load originated balance to settle: %w", err)
}
}
paymentAmount := new(big.Int).Neg(originatedBalance)
// if the remaining debt is still larger than some minimum amount, trigger monetary settlement
if paymentAmount.Cmp(a.minimumPayment) >= 0 {
balance.paymentOngoing = true
......@@ -346,6 +394,20 @@ func (a *Accounting) Balance(peer swarm.Address) (balance *big.Int, err error) {
return balance, nil
}
// Balance returns the current balance for the given peer.
func (a *Accounting) OriginatedBalance(peer swarm.Address) (balance *big.Int, err error) {
err = a.store.Get(originatedBalanceKey(peer), &balance)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return big.NewInt(0), ErrPeerNoBalance
}
return nil, err
}
return balance, nil
}
// SurplusBalance returns the current balance for the given peer.
func (a *Accounting) SurplusBalance(peer swarm.Address) (balance *big.Int, err error) {
err = a.store.Get(peerSurplusBalanceKey(peer), &balance)
......@@ -398,6 +460,10 @@ func peerSurplusBalanceKey(peer swarm.Address) string {
return fmt.Sprintf("%s%s", balancesSurplusPrefix, peer.String())
}
func originatedBalanceKey(peer swarm.Address) string {
return fmt.Sprintf("%s%s", balancesOriginatedPrefix, peer.String())
}
// getAccountingPeer returns the accountingPeer for a given swarm address.
// If not found in memory it will initialize it.
func (a *Accounting) getAccountingPeer(peer swarm.Address) *accountingPeer {
......@@ -628,6 +694,12 @@ func (a *Accounting) NotifyPaymentSent(peer swarm.Address, amount *big.Int, rece
a.logger.Errorf("accounting: notifypaymentsent failed to persist balance: %v", err)
return
}
err = a.decreaseOriginatedBalanceBy(peer, amount)
if err != nil {
a.logger.Warningf("accounting: notifypaymentsent failed to decrease originated balance: %v", err)
}
}
// NotifyPaymentThreshold should be called to notify accounting of changes in the payment threshold
......@@ -823,6 +895,11 @@ func (a *Accounting) increaseBalance(peer swarm.Address, accountingPeer *account
return nil, fmt.Errorf("failed to persist balance: %w", err)
}
err = a.decreaseOriginatedBalanceTo(peer, nextBalance)
if err != nil {
a.logger.Warningf("increase balance: failed to decrease originated balance: %v", err)
}
return nextBalance, nil
}
......@@ -866,6 +943,54 @@ func (d *debitAction) Cleanup() {
}
}
// decreaseOriginatedBalanceTo decreases the originated balance to provided limit or 0 if limit is positive
func (a *Accounting) decreaseOriginatedBalanceTo(peer swarm.Address, limit *big.Int) error {
zero := big.NewInt(0)
toSet := new(big.Int).Set(limit)
originatedBalance, err := a.OriginatedBalance(peer)
if err != nil && !errors.Is(err, ErrPeerNoBalance) {
return fmt.Errorf("failed to load originated balance: %w", err)
}
if toSet.Cmp(zero) > 0 {
toSet.Set(zero)
}
// If originated balance is more into the negative domain, set it to limit
if originatedBalance.Cmp(toSet) < 0 {
err = a.store.Put(originatedBalanceKey(peer), toSet)
if err != nil {
return fmt.Errorf("failed to persist originated balance: %w", err)
}
a.logger.Tracef("decreasing originated balance to peer %v to current balance %d", peer, toSet)
}
return nil
}
// decreaseOriginatedBalanceTo decreases the originated balance by provided amount even below 0
func (a *Accounting) decreaseOriginatedBalanceBy(peer swarm.Address, amount *big.Int) error {
originatedBalance, err := a.OriginatedBalance(peer)
if err != nil && !errors.Is(err, ErrPeerNoBalance) {
return fmt.Errorf("failed to load balance: %w", err)
}
// Move originated balance into the positive domain by amount
newOriginatedBalance := new(big.Int).Add(originatedBalance, amount)
err = a.store.Put(originatedBalanceKey(peer), newOriginatedBalance)
if err != nil {
return fmt.Errorf("failed to persist originated balance: %w", err)
}
a.logger.Tracef("decreasing originated balance to peer %v by amount %d to current balance %d", peer, amount, newOriginatedBalance)
return nil
}
func (a *Accounting) SetRefreshFunc(f RefreshFunc) {
a.refreshFunction = f
}
......
......@@ -37,9 +37,13 @@ type paymentCall struct {
// booking represents an accounting action and the expected result afterwards
type booking struct {
peer swarm.Address
price int64 // Credit if <0, Debit otherwise
expectedBalance int64
peer swarm.Address
price int64 // Credit if <0, Debit otherwise
expectedBalance int64
originatedBalance int64
originatedCredit bool
amount int64
notifyPaymentSent bool
}
// TestAccountingAddBalance does several accounting actions and verifies the balance after each steep
......@@ -78,7 +82,7 @@ func TestAccountingAddBalance(t *testing.T) {
if err != nil {
t.Fatal(err)
}
err = acc.Credit(booking.peer, uint64(-booking.price))
err = acc.Credit(booking.peer, uint64(-booking.price), true)
if err != nil {
t.Fatal(err)
}
......@@ -103,6 +107,88 @@ func TestAccountingAddBalance(t *testing.T) {
}
}
// TestAccountingAddBalance does several accounting actions and verifies the balance after each steep
func TestAccountingAddOriginatedBalance(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
store := mock.NewStateStore()
defer store.Close()
acc, err := accounting.NewAccounting(testPaymentThreshold, testPaymentTolerance, testPaymentEarly, logger, store, nil, big.NewInt(testRefreshRate))
if err != nil {
t.Fatal(err)
}
peer1Addr, err := swarm.ParseHexAddress("00112233")
if err != nil {
t.Fatal(err)
}
bookings := []booking{
// originated credit
{peer: peer1Addr, price: -200, expectedBalance: -200, originatedBalance: -200, originatedCredit: true},
// forwarder credit
{peer: peer1Addr, price: -200, expectedBalance: -400, originatedBalance: -200, originatedCredit: false},
// inconsequential debit not moving balance closer to 0 than originbalance is to 0
{peer: peer1Addr, price: 100, expectedBalance: -300, originatedBalance: -200},
// consequential debit moving balance closer to 0 than originbalance, therefore also moving originated balance along
{peer: peer1Addr, price: 200, expectedBalance: -100, originatedBalance: -100},
// notifypaymentsent that moves originated balance into positive domain
{peer: peer1Addr, amount: 200, expectedBalance: 100, originatedBalance: 100, notifyPaymentSent: true},
// inconsequential debit because originated balance is in the positive domain
{peer: peer1Addr, price: 100, expectedBalance: 200, originatedBalance: 100},
// originated credit moving the originated balance back into the negative domain, should be limited to the expectedbalance
{peer: peer1Addr, price: -300, expectedBalance: -100, originatedBalance: -100, originatedCredit: true},
}
for i, booking := range bookings {
if booking.notifyPaymentSent {
acc.NotifyPaymentSent(booking.peer, big.NewInt(booking.amount), nil)
} else {
if booking.price < 0 {
err = acc.Reserve(context.Background(), booking.peer, uint64(-booking.price))
if err != nil {
t.Fatal(err)
}
err = acc.Credit(booking.peer, uint64(-booking.price), booking.originatedCredit)
if err != nil {
t.Fatal(err)
}
acc.Release(booking.peer, uint64(-booking.price))
} else {
debitAction := acc.PrepareDebit(booking.peer, uint64(booking.price))
err = debitAction.Apply()
if err != nil {
t.Fatal(err)
}
debitAction.Cleanup()
}
}
balance, err := acc.Balance(booking.peer)
if err != nil {
t.Fatal(err)
}
if balance.Int64() != booking.expectedBalance {
t.Fatalf("balance for peer %v not as expected after booking %d. got %d, wanted %d", booking.peer.String(), i, balance, booking.expectedBalance)
}
originatedBalance, err := acc.OriginatedBalance(booking.peer)
if err != nil {
t.Fatal(err)
}
if originatedBalance.Int64() != booking.originatedBalance {
t.Fatalf("originated balance for peer %v not as expected after booking %d. got %d, wanted %d", booking.peer.String(), i, originatedBalance, booking.originatedBalance)
}
}
}
// TestAccountingAdd_persistentBalances tests that balances are actually persisted
// It creates an accounting instance, does some accounting
// Then it creates a new accounting instance with the same store and verifies the balances
......@@ -136,7 +222,7 @@ func TestAccountingAdd_persistentBalances(t *testing.T) {
debitAction.Cleanup()
peer2CreditAmount := 2 * testPrice
err = acc.Credit(peer2Addr, peer2CreditAmount)
err = acc.Credit(peer2Addr, peer2CreditAmount, true)
if err != nil {
t.Fatal(err)
}
......@@ -269,8 +355,8 @@ func TestAccountingCallSettlement(t *testing.T) {
t.Fatal(err)
}
// Credit until payment threshold
err = acc.Credit(peer1Addr, requestPrice)
// Credit until payment treshold
err = acc.Credit(peer1Addr, requestPrice, true)
if err != nil {
t.Fatal(err)
}
......@@ -322,7 +408,7 @@ func TestAccountingCallSettlement(t *testing.T) {
t.Fatal(err)
}
err = acc.Credit(peer1Addr, expectedAmount)
err = acc.Credit(peer1Addr, expectedAmount, true)
if err != nil {
t.Fatal(err)
}
......@@ -393,8 +479,8 @@ func TestAccountingCallSettlementMonetary(t *testing.T) {
t.Fatal(err)
}
// Credit until payment threshold
err = acc.Credit(peer1Addr, requestPrice)
// Credit until payment treshold
err = acc.Credit(peer1Addr, requestPrice, true)
if err != nil {
t.Fatal(err)
}
......@@ -510,8 +596,8 @@ func TestAccountingCallSettlementTooSoon(t *testing.T) {
t.Fatal(err)
}
// Credit until payment threshold
err = acc.Credit(peer1Addr, requestPrice)
// Credit until payment treshold
err = acc.Credit(peer1Addr, requestPrice, true)
if err != nil {
t.Fatal(err)
}
......@@ -553,8 +639,8 @@ func TestAccountingCallSettlementTooSoon(t *testing.T) {
t.Fatal(err)
}
// Credit until payment threshold
err = acc.Credit(peer1Addr, requestPrice)
// Credit until payment treshold
err = acc.Credit(peer1Addr, requestPrice, true)
if err != nil {
t.Fatal(err)
}
......@@ -638,7 +724,7 @@ func TestAccountingCallSettlementEarly(t *testing.T) {
t.Fatal(err)
}
err = acc.Credit(peer1Addr, debt)
err = acc.Credit(peer1Addr, debt, true)
if err != nil {
t.Fatal(err)
}
......@@ -917,7 +1003,7 @@ func TestAccountingNotifyPaymentThreshold(t *testing.T) {
t.Fatal(err)
}
err = acc.Credit(peer1Addr, debt)
err = acc.Credit(peer1Addr, debt, true)
if err != nil {
t.Fatal(err)
}
......@@ -974,7 +1060,7 @@ func TestAccountingPeerDebt(t *testing.T) {
}
peer2Addr := swarm.MustParseHexAddress("11112233")
err = acc.Credit(peer2Addr, 500)
err = acc.Credit(peer2Addr, 500, true)
if err != nil {
t.Fatal(err)
}
......
......@@ -13,13 +13,15 @@ type metrics struct {
// all metrics fields must be exported
// to be able to return them by Metrics()
// using reflection
TotalDebitedAmount prometheus.Counter
TotalCreditedAmount prometheus.Counter
DebitEventsCount prometheus.Counter
CreditEventsCount prometheus.Counter
AccountingDisconnectsCount prometheus.Counter
AccountingBlocksCount prometheus.Counter
AccountingReserveCount prometheus.Counter
TotalDebitedAmount prometheus.Counter
TotalCreditedAmount prometheus.Counter
DebitEventsCount prometheus.Counter
CreditEventsCount prometheus.Counter
AccountingDisconnectsCount prometheus.Counter
AccountingBlocksCount prometheus.Counter
AccountingReserveCount prometheus.Counter
TotalOriginatedCreditedAmount prometheus.Counter
OriginatedCreditEventsCount prometheus.Counter
}
func newMetrics() metrics {
......@@ -68,6 +70,18 @@ func newMetrics() metrics {
Name: "accounting_reserve_count",
Help: "Number of reserve calls",
}),
TotalOriginatedCreditedAmount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_originated_credited_amount",
Help: "Amount of BZZ credited to peers (potential cost of the node) for originated traffic",
}),
OriginatedCreditEventsCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "originated_credit_events_count",
Help: "Number of occurrences of BZZ credit events as originator towards peers",
}),
}
}
......
......@@ -21,7 +21,7 @@ type Service struct {
balances map[string]*big.Int
reserveFunc func(ctx context.Context, peer swarm.Address, price uint64) error
releaseFunc func(peer swarm.Address, price uint64)
creditFunc func(peer swarm.Address, price uint64) error
creditFunc func(peer swarm.Address, price uint64, orig bool) error
prepareDebitFunc func(peer swarm.Address, price uint64) accounting.Action
balanceFunc func(swarm.Address) (*big.Int, error)
shadowBalanceFunc func(swarm.Address) (*big.Int, error)
......@@ -54,7 +54,7 @@ func WithReleaseFunc(f func(peer swarm.Address, price uint64)) Option {
}
// WithCreditFunc sets the mock Credit function
func WithCreditFunc(f func(peer swarm.Address, price uint64) error) Option {
func WithCreditFunc(f func(peer swarm.Address, price uint64, orig bool) error) Option {
return optionFunc(func(s *Service) {
s.creditFunc = f
})
......@@ -128,9 +128,9 @@ func (s *Service) Release(peer swarm.Address, price uint64) {
}
// Credit is the mock function wrapper that calls the set implementation
func (s *Service) Credit(peer swarm.Address, price uint64) error {
func (s *Service) Credit(peer swarm.Address, price uint64, orig bool) error {
if s.creditFunc != nil {
return s.creditFunc(peer, price)
return s.creditFunc(peer, price, orig)
}
s.lock.Lock()
defer s.lock.Unlock()
......
......@@ -292,7 +292,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
return
}
err = ps.accounting.Credit(peer, receiptPrice)
err = ps.accounting.Credit(peer, receiptPrice, false)
}(peer)
......@@ -383,7 +383,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllo
ctxd, canceld := context.WithTimeout(ctx, defaultTTL)
defer canceld()
r, attempted, err := ps.pushPeer(ctxd, peer, ch)
r, attempted, err := ps.pushPeer(ctxd, peer, ch, retryAllowed)
// attempted is true if we get past accounting and actually attempt
// to send the request to the peer. If we dont get past accounting, we
// should not count the retry and try with a different peer again
......@@ -420,7 +420,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllo
return nil, ErrNoPush
}
func (ps *PushSync) pushPeer(ctx context.Context, peer swarm.Address, ch swarm.Chunk) (*pb.Receipt, bool, error) {
func (ps *PushSync) pushPeer(ctx context.Context, peer swarm.Address, ch swarm.Chunk, originated bool) (*pb.Receipt, bool, error) {
// compute the price we pay for this receipt and reserve it for the rest of this function
receiptPrice := ps.pricer.PeerPrice(peer, ch.Address())
......@@ -474,7 +474,7 @@ func (ps *PushSync) pushPeer(ctx context.Context, peer swarm.Address, ch swarm.C
return nil, true, fmt.Errorf("invalid receipt. chunk %s, peer %s", ch.Address(), peer)
}
err = ps.accounting.Credit(peer, receiptPrice)
err = ps.accounting.Credit(peer, receiptPrice, originated)
if err != nil {
return nil, true, err
}
......
......@@ -153,13 +153,14 @@ func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address, origin
ctx, cancel := context.WithTimeout(ctx, retrieveChunkTimeout)
defer cancel()
chunk, peer, requested, err := s.retrieveChunk(ctx, addr, sp)
chunk, peer, requested, err := s.retrieveChunk(ctx, addr, sp, origin)
resultC <- retrievalResult{
chunk: chunk,
peer: peer,
err: err,
retrieved: requested,
}
}()
} else {
resultC <- retrievalResult{}
......@@ -225,7 +226,7 @@ func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address, origin
return v.(swarm.Chunk), nil
}
func (s *Service) retrieveChunk(ctx context.Context, addr swarm.Address, sp *skipPeers) (chunk swarm.Chunk, peer swarm.Address, requested bool, err error) {
func (s *Service) retrieveChunk(ctx context.Context, addr swarm.Address, sp *skipPeers, originated bool) (chunk swarm.Chunk, peer swarm.Address, requested bool, err error) {
startTimer := time.Now()
v := ctx.Value(requestSourceContextKey{})
sourcePeerAddr := swarm.Address{}
......@@ -326,7 +327,7 @@ func (s *Service) retrieveChunk(ctx context.Context, addr swarm.Address, sp *ski
}
// credit the peer after successful delivery
err = s.accounting.Credit(peer, chunkPrice)
err = s.accounting.Credit(peer, chunkPrice, originated)
if err != nil {
return nil, peer, true, err
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment