Commit 51532f37 authored by metacertain's avatar metacertain Committed by GitHub

fix: non-blocking calls in pseudosettle (#2135)

parent 154f7255
...@@ -91,8 +91,7 @@ type accountingPeer struct { ...@@ -91,8 +91,7 @@ type accountingPeer struct {
paymentThreshold *big.Int // the threshold at which the peer expects us to pay paymentThreshold *big.Int // the threshold at which the peer expects us to pay
refreshTimestamp int64 // last time we attempted time-based settlement refreshTimestamp int64 // last time we attempted time-based settlement
paymentOngoing bool // indicate if we are currently settling with the peer paymentOngoing bool // indicate if we are currently settling with the peer
reconnectAllowTimestamp int64 lastSettlementFailureTimestamp int64 // time of last unsuccessful attempt to issue a cheque
lastSettlementFailureTimestamp int64 // time of last unsuccessful attempt to issue a cheque
connected bool connected bool
} }
...@@ -1079,15 +1078,6 @@ func (a *Accounting) Connect(peer swarm.Address) { ...@@ -1079,15 +1078,6 @@ func (a *Accounting) Connect(peer swarm.Address) {
if err != nil { if err != nil {
a.logger.Errorf("failed to persist surplus balance: %w", err) a.logger.Errorf("failed to persist surplus balance: %w", err)
} }
if accountingPeer.reconnectAllowTimestamp != 0 {
timeNow := a.timeNow().Unix()
if timeNow < accountingPeer.reconnectAllowTimestamp {
disconnectFor := accountingPeer.reconnectAllowTimestamp - timeNow
a.metrics.AccountingDisconnectsReconnectCount.Inc()
_ = a.p2p.Blocklist(peer, time.Duration(disconnectFor)*time.Second)
}
}
} }
// decreaseOriginatedBalanceTo decreases the originated balance to provided limit or 0 if limit is positive // decreaseOriginatedBalanceTo decreases the originated balance to provided limit or 0 if limit is positive
...@@ -1144,15 +1134,14 @@ func (a *Accounting) Disconnect(peer swarm.Address) { ...@@ -1144,15 +1134,14 @@ func (a *Accounting) Disconnect(peer swarm.Address) {
accountingPeer.lock.Lock() accountingPeer.lock.Lock()
defer accountingPeer.lock.Unlock() defer accountingPeer.lock.Unlock()
timeNow := a.timeNow().Unix() if accountingPeer.connected {
disconnectFor, err := a.blocklistUntil(peer, 1)
disconnectFor, err := a.blocklistUntil(peer, 1) if err != nil {
if err != nil { disconnectFor = int64(60)
disconnectFor = int64(60) }
accountingPeer.connected = false
_ = a.p2p.Blocklist(peer, time.Duration(disconnectFor)*time.Second)
} }
timestamp := timeNow + disconnectFor
accountingPeer.connected = false
accountingPeer.reconnectAllowTimestamp = timestamp
} }
func (a *Accounting) SetRefreshFunc(f RefreshFunc) { func (a *Accounting) SetRefreshFunc(f RefreshFunc) {
......
...@@ -1405,31 +1405,13 @@ func TestAccountingReconnectBeforeAllowed(t *testing.T) { ...@@ -1405,31 +1405,13 @@ func TestAccountingReconnectBeforeAllowed(t *testing.T) {
acc.Disconnect(peer) acc.Disconnect(peer)
if blocklistTime != 0 {
t.Fatal("unexpected blocklist")
}
//peer attempts to reconnect immediately
acc.Connect(peer)
if blocklistTime != int64(4*paymentThresholdInRefreshmentSeconds) { if blocklistTime != int64(4*paymentThresholdInRefreshmentSeconds) {
t.Fatalf("unexpected blocklisting time, got %v expected %v", blocklistTime, 4*paymentThresholdInRefreshmentSeconds) t.Fatalf("unexpected blocklisting time, got %v expected %v", blocklistTime, 4*paymentThresholdInRefreshmentSeconds)
} }
// 30 seconds pass, check whether we blocklist for the correct leftover time after a later connect attempt
ts = int64(1030)
acc.SetTime(ts)
acc.Connect(peer)
if blocklistTime != int64(paymentThresholdInRefreshmentSeconds) {
t.Fatalf("unexpected blocklisting time, got %v expected %v", blocklistTime, paymentThresholdInRefreshmentSeconds)
}
} }
func TestAccountingReconnectAfterAllowed(t *testing.T) { func TestAccountingResetBalanceAfterReconnect(t *testing.T) {
logger := logging.New(ioutil.Discard, 0) logger := logging.New(ioutil.Discard, 0)
store := mock.NewStateStore() store := mock.NewStateStore()
...@@ -1437,6 +1419,8 @@ func TestAccountingReconnectAfterAllowed(t *testing.T) { ...@@ -1437,6 +1419,8 @@ func TestAccountingReconnectAfterAllowed(t *testing.T) {
var blocklistTime int64 var blocklistTime int64
paymentThresholdInRefreshmentSeconds := new(big.Int).Div(testPaymentThreshold, big.NewInt(testRefreshRate)).Uint64()
f := func(s swarm.Address, t time.Duration) error { f := func(s swarm.Address, t time.Duration) error {
blocklistTime = int64(t.Seconds()) blocklistTime = int64(t.Seconds())
return nil return nil
...@@ -1488,16 +1472,28 @@ func TestAccountingReconnectAfterAllowed(t *testing.T) { ...@@ -1488,16 +1472,28 @@ func TestAccountingReconnectAfterAllowed(t *testing.T) {
acc.Disconnect(peer) acc.Disconnect(peer)
if blocklistTime != 0 { if blocklistTime != int64(4*paymentThresholdInRefreshmentSeconds) {
t.Fatal("unexpected blocklist") t.Fatalf("unexpected blocklisting time, got %v expected %v", blocklistTime, 4*paymentThresholdInRefreshmentSeconds)
} }
ts = int64(1040)
acc.SetTime(ts)
acc.Connect(peer) acc.Connect(peer)
if blocklistTime != 0 { balance, err := acc.Balance(peer)
t.Fatalf("unexpected blocklisting time, got %v expected %v", blocklistTime, 0) if err != nil {
t.Fatal(err)
} }
if balance.Int64() != 0 {
t.Fatalf("balance for peer %v not as expected got %d, wanted 0", peer.String(), balance)
}
surplusBalance, err := acc.SurplusBalance(peer)
if err != nil {
t.Fatal(err)
}
if surplusBalance.Int64() != 0 {
t.Fatalf("surplus balance for peer %v not as expected got %d, wanted 0", peer.String(), balance)
}
} }
...@@ -102,7 +102,7 @@ func (s *Service) init(ctx context.Context, p p2p.Peer) error { ...@@ -102,7 +102,7 @@ func (s *Service) init(ctx context.Context, p p2p.Peer) error {
s.peers[p.Address.String()] = peerData s.peers[p.Address.String()] = peerData
} }
s.accounting.Connect(p.Address) go s.accounting.Connect(p.Address)
return nil return nil
} }
...@@ -112,7 +112,7 @@ func (s *Service) terminate(p p2p.Peer) error { ...@@ -112,7 +112,7 @@ func (s *Service) terminate(p p2p.Peer) error {
delete(s.peers, p.Address.String()) delete(s.peers, p.Address.String())
s.accounting.Disconnect(p.Address) go s.accounting.Disconnect(p.Address)
return nil return nil
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment