From 88bca84eb0ed88910b2586eca8e99bfcd02db44e Mon Sep 17 00:00:00 2001
From: Felipe Andrade <felipe@oplabs.co>
Date: Sat, 27 May 2023 04:31:05 -0700
Subject: [PATCH] extract candidates logic, refactor update backend

---
 proxyd/consensus_poller.go                 | 223 +++++++++++----------
 proxyd/integration_tests/consensus_test.go |   2 +-
 2 files changed, 123 insertions(+), 102 deletions(-)

diff --git a/proxyd/consensus_poller.go b/proxyd/consensus_poller.go
index decacd3f2..9ed28a408 100644
--- a/proxyd/consensus_poller.go
+++ b/proxyd/consensus_poller.go
@@ -57,6 +57,10 @@ type backendState struct {
 	bannedUntil time.Time
 }
 
+func (bs backendState) IsBanned() bool {
+	return time.Now().Before(bs.bannedUntil)
+}
+
 // GetConsensusGroup returns the backend members that are agreeing in a consensus
 func (cp *ConsensusPoller) GetConsensusGroup() []*Backend {
 	defer cp.consensusGroupMux.Unlock()
@@ -242,10 +246,10 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller
 
 // UpdateBackend refreshes the consensus state of a single backend
 func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
-	banned := cp.IsBanned(be)
-	RecordConsensusBackendBanned(be, banned)
+	bs := cp.getBackendState(be)
+	RecordConsensusBackendBanned(be, bs.IsBanned())
 
-	if banned {
+	if bs.IsBanned() {
 		log.Debug("skipping backend - banned", "backend", be.Name)
 		return
 	}
@@ -287,24 +291,39 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
 		log.Warn("error updating backend - finalized block", "name", be.Name, "err", err)
 	}
 
-	bs := cp.getBackendState(be)
 	oldFinalized := bs.finalizedBlockNumber
 	oldSafe := bs.safeBlockNumber
 
-	expectedBlockTags := cp.checkExpectedBlockTags(
-		finalizedBlockNumber, oldFinalized,
-		safeBlockNumber, oldSafe,
-		latestBlockNumber)
+	updateDelay := time.Since(bs.lastUpdate)
+	RecordConsensusBackendUpdateDelay(be, updateDelay)
 
-	changed, updateDelay := cp.setBackendState(be, peerCount, inSync,
+	changed := cp.setBackendState(be, peerCount, inSync,
 		latestBlockNumber, latestBlockHash,
 		finalizedBlockNumber, safeBlockNumber)
 
 	RecordBackendLatestBlock(be, latestBlockNumber)
 	RecordBackendSafeBlock(be, safeBlockNumber)
 	RecordBackendFinalizedBlock(be, finalizedBlockNumber)
+
+	if changed {
+		log.Debug("backend state updated",
+			"name", be.Name,
+			"peerCount", peerCount,
+			"inSync", inSync,
+			"latestBlockNumber", latestBlockNumber,
+			"latestBlockHash", latestBlockHash,
+			"finalizedBlockNumber", finalizedBlockNumber,
+			"safeBlockNumber", safeBlockNumber,
+			"updateDelay", updateDelay)
+	}
+
+	// sanity check for latest, safe and finalized block tags
+	expectedBlockTags := cp.checkExpectedBlockTags(
+		finalizedBlockNumber, oldFinalized,
+		safeBlockNumber, oldSafe,
+		latestBlockNumber)
+
 	RecordBackendUnexpectedBlockTags(be, !expectedBlockTags)
-	RecordConsensusBackendUpdateDelay(be, updateDelay)
 
 	if !expectedBlockTags {
 		log.Warn("backend banned - unexpected block tags",
@@ -317,18 +336,6 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
 		)
 		cp.Ban(be)
 	}
-
-	if changed {
-		log.Debug("backend state updated",
-			"name", be.Name,
-			"peerCount", peerCount,
-			"inSync", inSync,
-			"latestBlockNumber", latestBlockNumber,
-			"latestBlockHash", latestBlockHash,
-			"finalizedBlockNumber", finalizedBlockNumber,
-			"safeBlockNumber", safeBlockNumber,
-			"updateDelay", updateDelay)
-	}
 }
 
 // checkExpectedBlockTags for unexpected conditions on block tags
@@ -349,101 +356,41 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
 	// get the latest block number from the tracker
 	currentConsensusBlockNumber := cp.GetLatestBlockNumber()
 
-	// find out what backends are the candidates to be in the consensus group
-	// and create a copy of current their state
-	//
-	// a serving node needs to be:
-	//	- not banned
-	//	- healthy (network latency and error rate)
-	//	- with minimum peer count
-	//	- in sync
-	//	- updated recently
-	// 	- not lagging latest block
-
-	candidates := make(map[*Backend]*backendState, len(cp.backendGroup.Backends))
-	filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends))
-	for _, be := range cp.backendGroup.Backends {
-		bs := cp.getBackendState(be)
-		passed := true
-		if time.Now().Before(bs.bannedUntil) {
-			passed = false
-		}
-		if !be.IsHealthy() {
-			passed = false
-		}
-		if !be.skipPeerCountCheck && bs.peerCount < cp.minPeerCount {
-			passed = false
-		}
-		if !bs.inSync {
-			passed = false
-		}
-		if bs.lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) {
-			passed = false
-		}
-		if passed {
-			candidates[be] = bs
-		} else {
-			filteredBackendsNames = append(filteredBackendsNames, be.Name)
-		}
-	}
-
-	// find the highest block, in order to use it defining the highest non-lagging ancestor block
-	var highestLatestBlock hexutil.Uint64
-	for _, bs := range candidates {
-		if bs.latestBlockNumber > highestLatestBlock {
-			highestLatestBlock = bs.latestBlockNumber
-		}
-	}
+	// get the candidates for the consensus group
+	candidates := cp.getConsensusCandidates()
 
-	// find the highest common ancestor block
+	// update the lowest latest block number and hash
+	//        the lowest safe block number
+	//        the lowest finalized block number
 	var lowestLatestBlock hexutil.Uint64
 	var lowestLatestBlockHash string
 	var lowestFinalizedBlock hexutil.Uint64
 	var lowestSafeBlock hexutil.Uint64
-	lagging := make([]*Backend, 0, len(candidates))
-	for be, bs := range candidates {
-		// check if backend is lagging behind the highest block
-		if bs.latestBlockNumber < highestLatestBlock && uint64(highestLatestBlock-bs.latestBlockNumber) > cp.maxBlockLag {
-			lagging = append(lagging, be)
-			continue
-		}
-
-		// update the lowest common ancestor block
+	for _, bs := range candidates {
 		if lowestLatestBlock == 0 || bs.latestBlockNumber < lowestLatestBlock {
 			lowestLatestBlock = bs.latestBlockNumber
 			lowestLatestBlockHash = bs.latestBlockHash
 		}
-
-		// update the lowest finalized block
 		if lowestFinalizedBlock == 0 || bs.finalizedBlockNumber < lowestFinalizedBlock {
 			lowestFinalizedBlock = bs.finalizedBlockNumber
 		}
-
-		// update the lowest safe block
 		if lowestSafeBlock == 0 || bs.safeBlockNumber < lowestSafeBlock {
 			lowestSafeBlock = bs.safeBlockNumber
 		}
 	}
 
-	// remove lagging backends from the candidates
-	for _, be := range lagging {
-		filteredBackendsNames = append(filteredBackendsNames, be.Name)
-		delete(candidates, be)
-	}
-
 	// find the proposed block among the candidates
+	// the proposed block needs have the same hash in the entire consensus group
 	proposedBlock := lowestLatestBlock
 	proposedBlockHash := lowestLatestBlockHash
 	hasConsensus := false
+	broken := false
 
 	if lowestLatestBlock > currentConsensusBlockNumber {
 		log.Debug("validating consensus on block", "lowestLatestBlock", lowestLatestBlock)
 	}
 
-	// if there is no block to propose, the consensus is automatically broken
-	// this can happen when backends have just recovered
-	broken := proposedBlock == 0 && currentConsensusBlockNumber > 0
-
+	// if there is a block to propose, check if it is the same in all backends
 	if proposedBlock > 0 {
 		for !hasConsensus {
 			allAgreed := true
@@ -459,7 +406,12 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
 				blocksDontMatch := (actualBlockNumber != proposedBlock) || (actualBlockHash != proposedBlockHash)
 				if blocksDontMatch {
 					if currentConsensusBlockNumber >= actualBlockNumber {
-						log.Warn("backend broke consensus", "name", be.Name, "blockNum", actualBlockNumber, "proposedBlockNum", proposedBlock, "blockHash", actualBlockHash, "proposedBlockHash", proposedBlockHash)
+						log.Warn("backend broke consensus",
+							"name", be.Name,
+							"actualBlockNumber", actualBlockNumber,
+							"actualBlockHash", actualBlockHash,
+							"proposedBlock", proposedBlock,
+							"proposedBlockHash", proposedBlockHash)
 						broken = true
 					}
 					allAgreed = false
@@ -488,6 +440,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
 			"proposedBlockHash", proposedBlockHash)
 	}
 
+	// update tracker
 	cp.tracker.SetLatestBlockNumber(proposedBlock)
 	cp.tracker.SetSafeBlockNumber(lowestSafeBlock)
 	cp.tracker.SetFinalizedBlockNumber(lowestFinalizedBlock)
@@ -495,10 +448,17 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
 	// update consensus group
 	group := make([]*Backend, 0, len(candidates))
 	consensusBackendsNames := make([]string, 0, len(candidates))
-	for be, _ := range candidates {
-		group = append(group, be)
-		consensusBackendsNames = append(consensusBackendsNames, be.Name)
+	filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends))
+	for _, be := range cp.backendGroup.Backends {
+		_, exist := candidates[be]
+		if exist {
+			group = append(group, be)
+			consensusBackendsNames = append(consensusBackendsNames, be.Name)
+		} else {
+			filteredBackendsNames = append(filteredBackendsNames, be.Name)
+		}
 	}
+
 	cp.consensusGroupMux.Lock()
 	cp.consensusGroup = group
 	cp.consensusGroupMux.Unlock()
@@ -522,7 +482,7 @@ func (cp *ConsensusPoller) IsBanned(be *Backend) bool {
 	bs := cp.backendState[be]
 	defer bs.backendStateMux.Unlock()
 	bs.backendStateMux.Lock()
-	return time.Now().Before(bs.bannedUntil)
+	return bs.IsBanned()
 }
 
 // Ban bans a specific backend
@@ -531,6 +491,9 @@ func (cp *ConsensusPoller) Ban(be *Backend) {
 	defer bs.backendStateMux.Unlock()
 	bs.backendStateMux.Lock()
 	bs.bannedUntil = time.Now().Add(cp.banPeriod)
+
+	// when we ban a node, we give it the chance to start from any block when it is back
+	bs.latestBlockNumber = 0
 	bs.safeBlockNumber = 0
 	bs.finalizedBlockNumber = 0
 }
@@ -613,12 +576,12 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo
 	return res, nil
 }
 
+// getBackendState creates a copy of backend state so that the caller can use it without locking
 func (cp *ConsensusPoller) getBackendState(be *Backend) *backendState {
 	bs := cp.backendState[be]
 	defer bs.backendStateMux.Unlock()
 	bs.backendStateMux.Lock()
 
-	// we return a copy so that the caller can use it without locking
 	return &backendState{
 		latestBlockNumber:    bs.latestBlockNumber,
 		latestBlockHash:      bs.latestBlockHash,
@@ -634,18 +597,76 @@ func (cp *ConsensusPoller) getBackendState(be *Backend) *backendState {
 func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool,
 	latestBlockNumber hexutil.Uint64, latestBlockHash string,
 	finalizedBlockNumber hexutil.Uint64,
-	safeBlockNumber hexutil.Uint64) (changed bool, updateDelay time.Duration) {
+	safeBlockNumber hexutil.Uint64) bool {
 	bs := cp.backendState[be]
 	bs.backendStateMux.Lock()
-	changed = bs.latestBlockHash != latestBlockHash
+	changed := bs.latestBlockHash != latestBlockHash
 	bs.peerCount = peerCount
 	bs.inSync = inSync
 	bs.latestBlockNumber = latestBlockNumber
 	bs.latestBlockHash = latestBlockHash
 	bs.finalizedBlockNumber = finalizedBlockNumber
 	bs.safeBlockNumber = safeBlockNumber
-	updateDelay = time.Since(bs.lastUpdate)
 	bs.lastUpdate = time.Now()
 	bs.backendStateMux.Unlock()
-	return
+	return changed
+}
+
+// getConsensusCandidates find out what backends are the candidates to be in the consensus group
+// and create a copy of current their state
+//
+// a candidate is a serving node within the following conditions:
+//   - not banned
+//   - healthy (network latency and error rate)
+//   - with minimum peer count
+//   - in sync
+//   - updated recently
+//   - not lagging latest block
+func (cp *ConsensusPoller) getConsensusCandidates() map[*Backend]*backendState {
+	candidates := make(map[*Backend]*backendState, len(cp.backendGroup.Backends))
+
+	for _, be := range cp.backendGroup.Backends {
+		bs := cp.getBackendState(be)
+		if bs.IsBanned() {
+			continue
+		}
+		if !be.IsHealthy() {
+			continue
+		}
+		if !be.skipPeerCountCheck && bs.peerCount < cp.minPeerCount {
+			continue
+		}
+		if !bs.inSync {
+			continue
+		}
+		if bs.lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) {
+			continue
+		}
+
+		candidates[be] = bs
+	}
+
+	// find the highest block, in order to use it defining the highest non-lagging ancestor block
+	var highestLatestBlock hexutil.Uint64
+	for _, bs := range candidates {
+		if bs.latestBlockNumber > highestLatestBlock {
+			highestLatestBlock = bs.latestBlockNumber
+		}
+	}
+
+	// find the highest common ancestor block
+	lagging := make([]*Backend, 0, len(candidates))
+	for be, bs := range candidates {
+		// check if backend is lagging behind the highest block
+		if bs.latestBlockNumber < highestLatestBlock && uint64(highestLatestBlock-bs.latestBlockNumber) > cp.maxBlockLag {
+			lagging = append(lagging, be)
+		}
+	}
+
+	// remove lagging backends from the candidates
+	for _, be := range lagging {
+		delete(candidates, be)
+	}
+
+	return candidates
 }
diff --git a/proxyd/integration_tests/consensus_test.go b/proxyd/integration_tests/consensus_test.go
index e8927d2a6..98196c9b2 100644
--- a/proxyd/integration_tests/consensus_test.go
+++ b/proxyd/integration_tests/consensus_test.go
@@ -428,7 +428,7 @@ func TestConsensus(t *testing.T) {
 		require.Equal(t, "0xb1", bg.Consensus.GetSafeBlockNumber().String())
 	})
 
-	t.Run("latest dropped below safe, and stayed inconsistent after ban", func(t *testing.T) {
+	t.Run("latest dropped below safe, and stayed inconsistent", func(t *testing.T) {
 		reset()
 		useOnlyNode1()
 		overrideBlock("node1", "latest", "0xd1")
-- 
2.23.0