Commit 0f889d8d authored by metacertain's avatar metacertain Committed by GitHub

feat: Always reset balances to zero after disconnection or blocklisting (#1983)

parent 0a861f1a
......@@ -46,7 +46,7 @@ type Interface interface {
// Credit increases the balance the peer has with us (we "pay" the peer).
Credit(peer swarm.Address, price uint64, originated bool) error
// PrepareDebit returns an accounting Action for the later debit to be executed on and to implement shadowing a possibly credited part of reserve on the other side.
PrepareDebit(peer swarm.Address, price uint64) Action
PrepareDebit(peer swarm.Address, price uint64) (Action, error)
// Balance returns the current balance for the given peer.
Balance(peer swarm.Address) (*big.Int, error)
// SurplusBalance returns the current surplus balance for the given peer.
......@@ -87,10 +87,13 @@ type accountingPeer struct {
lock sync.Mutex // lock to be held during any accounting action for this peer
reservedBalance *big.Int // amount currently reserved for active peer interaction
shadowReservedBalance *big.Int // amount potentially to be debited for active peer interaction
ghostBalance *big.Int // amount potentially could have been debited for but was not
paymentThreshold *big.Int // the threshold at which the peer expects us to pay
refreshTimestamp int64 // last time we attempted time-based settlement
paymentOngoing bool // indicate if we are currently settling with the peer
lastSettlementFailureTimestamp int64 // time of last unsuccessful attempt to issue a cheque
reconnectAllowTimestamp int64
lastSettlementFailureTimestamp int64 // time of last unsuccessful attempt to issue a cheque
connected bool
}
// Accounting is the main implementation of the accounting interface.
......@@ -120,6 +123,7 @@ type Accounting struct {
pricing pricing.Interface
metrics metrics
wg sync.WaitGroup
p2p p2p.Service
timeNow func() time.Time
}
......@@ -143,6 +147,8 @@ func NewAccounting(
Store storage.StateStorer,
Pricing pricing.Interface,
refreshRate *big.Int,
p2pService p2p.Service,
) (*Accounting, error) {
return &Accounting{
accountingPeers: make(map[string]*accountingPeer),
......@@ -157,6 +163,7 @@ func NewAccounting(
refreshRate: refreshRate,
timeNow: time.Now,
minimumPayment: new(big.Int).Div(refreshRate, big.NewInt(minimumPaymentDivisor)),
p2p: p2pService,
}, nil
}
......@@ -165,6 +172,11 @@ func (a *Accounting) Reserve(ctx context.Context, peer swarm.Address, price uint
accountingPeer := a.getAccountingPeer(peer)
accountingPeer.lock.Lock()
if !accountingPeer.connected {
return fmt.Errorf("connection not initialized yet")
}
defer accountingPeer.lock.Unlock()
a.metrics.AccountingReserveCount.Inc()
......@@ -485,8 +497,10 @@ func (a *Accounting) getAccountingPeer(peer swarm.Address) *accountingPeer {
peerData = &accountingPeer{
reservedBalance: big.NewInt(0),
shadowReservedBalance: big.NewInt(0),
ghostBalance: big.NewInt(0),
// initially assume the peer has the same threshold as us
paymentThreshold: new(big.Int).Set(a.paymentThreshold),
connected: false,
}
a.accountingPeers[peer.String()] = peerData
}
......@@ -633,6 +647,36 @@ func (a *Accounting) PeerDebt(peer swarm.Address) (*big.Int, error) {
return peerDebt, nil
}
// peerLatentDebt returns the sum of the positive part of the outstanding balance, shadow reserve and the ghost balance
func (a *Accounting) peerLatentDebt(peer swarm.Address) (*big.Int, error) {
accountingPeer := a.getAccountingPeer(peer)
balance := new(big.Int)
zero := big.NewInt(0)
err := a.store.Get(peerBalanceKey(peer), &balance)
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
return nil, err
}
balance = big.NewInt(0)
}
if balance.Cmp(zero) < 0 {
balance.Set(zero)
}
peerDebt := new(big.Int).Add(balance, accountingPeer.shadowReservedBalance)
peerLatentDebt := new(big.Int).Add(peerDebt, accountingPeer.ghostBalance)
if peerLatentDebt.Cmp(zero) < 0 {
return zero, nil
}
return peerLatentDebt, nil
}
// shadowBalance returns the current debt reduced by any potentially debitable amount stored in shadowReservedBalance
// this represents how much less our debt could potentially be seen by the other party if it's ahead with processing credits corresponding to our shadow reserve
func (a *Accounting) shadowBalance(peer swarm.Address) (shadowBalance *big.Int, err error) {
......@@ -827,12 +871,16 @@ func (a *Accounting) NotifyRefreshmentReceived(peer swarm.Address, amount *big.I
}
// PrepareDebit prepares a debit operation by increasing the shadowReservedBalance
func (a *Accounting) PrepareDebit(peer swarm.Address, price uint64) Action {
func (a *Accounting) PrepareDebit(peer swarm.Address, price uint64) (Action, error) {
accountingPeer := a.getAccountingPeer(peer)
accountingPeer.lock.Lock()
defer accountingPeer.lock.Unlock()
if !accountingPeer.connected {
return nil, fmt.Errorf("connection not initialized yet")
}
bigPrice := new(big.Int).SetUint64(price)
accountingPeer.shadowReservedBalance = new(big.Int).Add(accountingPeer.shadowReservedBalance, bigPrice)
......@@ -843,7 +891,7 @@ func (a *Accounting) PrepareDebit(peer swarm.Address, price uint64) Action {
peer: peer,
accountingPeer: accountingPeer,
applied: false,
}
}, nil
}
func (a *Accounting) increaseBalance(peer swarm.Address, accountingPeer *accountingPeer, price *big.Int) (*big.Int, error) {
......@@ -940,7 +988,13 @@ func (d *debitAction) Apply() error {
if nextBalance.Cmp(a.disconnectLimit) >= 0 {
// peer too much in debt
a.metrics.AccountingDisconnectsCount.Inc()
return p2p.NewBlockPeerError(24*time.Hour, ErrDisconnectThresholdExceeded)
disconnectFor, err := a.blocklistUntil(d.peer, 1)
if err != nil {
return p2p.NewBlockPeerError(1*time.Minute, ErrDisconnectThresholdExceeded)
}
return p2p.NewBlockPeerError(time.Duration(disconnectFor), ErrDisconnectThresholdExceeded)
}
return nil
......@@ -951,7 +1005,75 @@ func (d *debitAction) Cleanup() {
if !d.applied {
d.accountingPeer.lock.Lock()
defer d.accountingPeer.lock.Unlock()
a := d.accounting
d.accountingPeer.shadowReservedBalance = new(big.Int).Sub(d.accountingPeer.shadowReservedBalance, d.price)
d.accountingPeer.ghostBalance = new(big.Int).Add(d.accountingPeer.ghostBalance, d.price)
if d.accountingPeer.ghostBalance.Cmp(a.disconnectLimit) > 0 {
_ = a.blocklist(d.peer, 1)
}
}
}
func (a *Accounting) blocklistUntil(peer swarm.Address, multiplier int64) (int64, error) {
debt, err := a.peerLatentDebt(peer)
if err != nil {
return 0, err
}
if debt.Cmp(a.refreshRate) < 0 {
debt.Set(a.refreshRate)
}
additionalDebt := new(big.Int).Add(debt, a.paymentThreshold)
multiplyDebt := new(big.Int).Mul(additionalDebt, big.NewInt(multiplier))
k := new(big.Int).Div(multiplyDebt, a.refreshRate)
kInt := k.Int64()
return kInt, nil
}
func (a *Accounting) blocklist(peer swarm.Address, multiplier int64) error {
disconnectFor, err := a.blocklistUntil(peer, multiplier)
if err != nil {
return a.p2p.Blocklist(peer, 1*time.Minute)
}
return a.p2p.Blocklist(peer, time.Duration(disconnectFor)*time.Second)
}
func (a *Accounting) Connect(peer swarm.Address) {
accountingPeer := a.getAccountingPeer(peer)
zero := big.NewInt(0)
accountingPeer.lock.Lock()
defer accountingPeer.lock.Unlock()
accountingPeer.connected = true
accountingPeer.shadowReservedBalance.Set(zero)
accountingPeer.ghostBalance.Set(zero)
accountingPeer.reservedBalance.Set(zero)
err := a.store.Put(peerBalanceKey(peer), zero)
if err != nil {
a.logger.Errorf("failed to persist balance: %w", err)
}
err = a.store.Put(peerSurplusBalanceKey(peer), zero)
if err != nil {
a.logger.Errorf("failed to persist surplus balance: %w", err)
}
if accountingPeer.reconnectAllowTimestamp != 0 {
timeNow := a.timeNow().Unix()
if timeNow < accountingPeer.reconnectAllowTimestamp {
disconnectFor := accountingPeer.reconnectAllowTimestamp - timeNow
_ = a.p2p.Blocklist(peer, time.Duration(disconnectFor)*time.Second)
}
}
}
......@@ -1003,6 +1125,23 @@ func (a *Accounting) decreaseOriginatedBalanceBy(peer swarm.Address, amount *big
return nil
}
func (a *Accounting) Disconnect(peer swarm.Address) {
accountingPeer := a.getAccountingPeer(peer)
accountingPeer.lock.Lock()
defer accountingPeer.lock.Unlock()
timeNow := a.timeNow().Unix()
disconnectFor, err := a.blocklistUntil(peer, 1)
if err != nil {
disconnectFor = int64(60)
}
timestamp := timeNow + disconnectFor
accountingPeer.connected = false
accountingPeer.reconnectAllowTimestamp = timestamp
}
func (a *Accounting) SetRefreshFunc(f RefreshFunc) {
a.refreshFunction = f
}
......
This diff is collapsed.
......@@ -22,7 +22,7 @@ type Service struct {
reserveFunc func(ctx context.Context, peer swarm.Address, price uint64) error
releaseFunc func(peer swarm.Address, price uint64)
creditFunc func(peer swarm.Address, price uint64, orig bool) error
prepareDebitFunc func(peer swarm.Address, price uint64) accounting.Action
prepareDebitFunc func(peer swarm.Address, price uint64) (accounting.Action, error)
balanceFunc func(swarm.Address) (*big.Int, error)
shadowBalanceFunc func(swarm.Address) (*big.Int, error)
balancesFunc func() (map[string]*big.Int, error)
......@@ -61,7 +61,7 @@ func WithCreditFunc(f func(peer swarm.Address, price uint64, orig bool) error) O
}
// WithDebitFunc sets the mock Debit function
func WithPrepareDebitFunc(f func(peer swarm.Address, price uint64) accounting.Action) Option {
func WithPrepareDebitFunc(f func(peer swarm.Address, price uint64) (accounting.Action, error)) Option {
return optionFunc(func(s *Service) {
s.prepareDebitFunc = f
})
......@@ -144,7 +144,7 @@ func (s *Service) Credit(peer swarm.Address, price uint64, orig bool) error {
}
// Debit is the mock function wrapper that calls the set implementation
func (s *Service) PrepareDebit(peer swarm.Address, price uint64) accounting.Action {
func (s *Service) PrepareDebit(peer swarm.Address, price uint64) (accounting.Action, error) {
if s.prepareDebitFunc != nil {
return s.prepareDebitFunc(peer, price)
}
......@@ -155,7 +155,7 @@ func (s *Service) PrepareDebit(peer swarm.Address, price uint64) accounting.Acti
price: bigPrice,
peer: peer,
applied: false,
}
}, nil
}
......@@ -227,6 +227,14 @@ func (s *Service) CompensatedBalances() (map[string]*big.Int, error) {
return s.balances, nil
}
func (s *Service) Connect(peer swarm.Address) {
}
func (s *Service) Disconnect(peer swarm.Address) {
}
//
func (s *Service) SurplusBalance(peer swarm.Address) (*big.Int, error) {
if s.balanceFunc != nil {
......
......@@ -522,6 +522,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
stateStore,
pricing,
big.NewInt(refreshRate),
p2ps,
)
if err != nil {
return nil, fmt.Errorf("accounting: %w", err)
......
......@@ -163,7 +163,10 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
return fmt.Errorf("chunk store: %w", err)
}
debit := ps.accounting.PrepareDebit(p.Address, price)
debit, err := ps.accounting.PrepareDebit(p.Address, price)
if err != nil {
return fmt.Errorf("prepare debit to peer %s before writeback: %w", p.Address.String(), err)
}
defer debit.Cleanup()
// return back receipt
......@@ -308,7 +311,10 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
}
// return back receipt
debit := ps.accounting.PrepareDebit(p.Address, price)
debit, err := ps.accounting.PrepareDebit(p.Address, price)
if err != nil {
return fmt.Errorf("prepare debit to peer %s before writeback: %w", p.Address.String(), err)
}
defer debit.Cleanup()
receipt := pb.Receipt{Address: chunk.Address().Bytes(), Signature: signature}
......@@ -322,7 +328,10 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
}
debit := ps.accounting.PrepareDebit(p.Address, price)
debit, err := ps.accounting.PrepareDebit(p.Address, price)
if err != nil {
return fmt.Errorf("prepare debit to peer %s before writeback: %w", p.Address.String(), err)
}
defer debit.Cleanup()
// pass back the receipt
......
......@@ -429,7 +429,10 @@ func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (e
}
chunkPrice := s.pricer.Price(chunk.Address())
debit := s.accounting.PrepareDebit(p.Address, chunkPrice)
debit, err := s.accounting.PrepareDebit(p.Address, chunkPrice)
if err != nil {
return fmt.Errorf("prepare debit to peer %s before writeback: %w", p.Address.String(), err)
}
defer debit.Cleanup()
if err := w.WriteMsgWithContext(ctx, &pb.Delivery{
......
......@@ -32,4 +32,6 @@ type Accounting interface {
NotifyPaymentReceived(peer swarm.Address, amount *big.Int) error
NotifyPaymentSent(peer swarm.Address, amount *big.Int, receivedError error)
NotifyRefreshmentReceived(peer swarm.Address, amount *big.Int) error
Connect(peer swarm.Address)
Disconnect(peer swarm.Address)
}
......@@ -102,6 +102,7 @@ func (s *Service) init(ctx context.Context, p p2p.Peer) error {
s.peers[p.Address.String()] = peerData
}
s.accounting.Connect(p.Address)
return nil
}
......@@ -110,6 +111,8 @@ func (s *Service) terminate(p p2p.Peer) error {
defer s.peersMu.Unlock()
delete(s.peers, p.Address.String())
s.accounting.Disconnect(p.Address)
return nil
}
......
......@@ -61,6 +61,14 @@ func (t *testObserver) PeerDebt(peer swarm.Address) (*big.Int, error) {
return nil, errors.New("Peer not listed")
}
func (t *testObserver) Connect(peer swarm.Address) {
}
func (t *testObserver) Disconnect(peer swarm.Address) {
}
func (t *testObserver) NotifyRefreshmentReceived(peer swarm.Address, amount *big.Int) error {
t.receivedCalled <- notifyPaymentReceivedCall{
peer: peer,
......
......@@ -82,6 +82,14 @@ func (t *testObserver) NotifyPaymentSent(peer swarm.Address, amount *big.Int, er
}
}
func (t *testObserver) Connect(peer swarm.Address) {
}
func (t *testObserver) Disconnect(peer swarm.Address) {
}
type addressbookMock struct {
beneficiary func(peer swarm.Address) (beneficiary common.Address, known bool, err error)
chequebook func(peer swarm.Address) (chequebookAddress common.Address, known bool, err error)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment