Commit 88bca84e authored by Felipe Andrade's avatar Felipe Andrade

extract candidates logic, refactor update backend

parent 1270a43c
...@@ -57,6 +57,10 @@ type backendState struct { ...@@ -57,6 +57,10 @@ type backendState struct {
bannedUntil time.Time bannedUntil time.Time
} }
func (bs backendState) IsBanned() bool {
return time.Now().Before(bs.bannedUntil)
}
// GetConsensusGroup returns the backend members that are agreeing in a consensus // GetConsensusGroup returns the backend members that are agreeing in a consensus
func (cp *ConsensusPoller) GetConsensusGroup() []*Backend { func (cp *ConsensusPoller) GetConsensusGroup() []*Backend {
defer cp.consensusGroupMux.Unlock() defer cp.consensusGroupMux.Unlock()
...@@ -242,10 +246,10 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller ...@@ -242,10 +246,10 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller
// UpdateBackend refreshes the consensus state of a single backend // UpdateBackend refreshes the consensus state of a single backend
func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
banned := cp.IsBanned(be) bs := cp.getBackendState(be)
RecordConsensusBackendBanned(be, banned) RecordConsensusBackendBanned(be, bs.IsBanned())
if banned { if bs.IsBanned() {
log.Debug("skipping backend - banned", "backend", be.Name) log.Debug("skipping backend - banned", "backend", be.Name)
return return
} }
...@@ -287,24 +291,39 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { ...@@ -287,24 +291,39 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
log.Warn("error updating backend - finalized block", "name", be.Name, "err", err) log.Warn("error updating backend - finalized block", "name", be.Name, "err", err)
} }
bs := cp.getBackendState(be)
oldFinalized := bs.finalizedBlockNumber oldFinalized := bs.finalizedBlockNumber
oldSafe := bs.safeBlockNumber oldSafe := bs.safeBlockNumber
expectedBlockTags := cp.checkExpectedBlockTags( updateDelay := time.Since(bs.lastUpdate)
finalizedBlockNumber, oldFinalized, RecordConsensusBackendUpdateDelay(be, updateDelay)
safeBlockNumber, oldSafe,
latestBlockNumber)
changed, updateDelay := cp.setBackendState(be, peerCount, inSync, changed := cp.setBackendState(be, peerCount, inSync,
latestBlockNumber, latestBlockHash, latestBlockNumber, latestBlockHash,
finalizedBlockNumber, safeBlockNumber) finalizedBlockNumber, safeBlockNumber)
RecordBackendLatestBlock(be, latestBlockNumber) RecordBackendLatestBlock(be, latestBlockNumber)
RecordBackendSafeBlock(be, safeBlockNumber) RecordBackendSafeBlock(be, safeBlockNumber)
RecordBackendFinalizedBlock(be, finalizedBlockNumber) RecordBackendFinalizedBlock(be, finalizedBlockNumber)
if changed {
log.Debug("backend state updated",
"name", be.Name,
"peerCount", peerCount,
"inSync", inSync,
"latestBlockNumber", latestBlockNumber,
"latestBlockHash", latestBlockHash,
"finalizedBlockNumber", finalizedBlockNumber,
"safeBlockNumber", safeBlockNumber,
"updateDelay", updateDelay)
}
// sanity check for latest, safe and finalized block tags
expectedBlockTags := cp.checkExpectedBlockTags(
finalizedBlockNumber, oldFinalized,
safeBlockNumber, oldSafe,
latestBlockNumber)
RecordBackendUnexpectedBlockTags(be, !expectedBlockTags) RecordBackendUnexpectedBlockTags(be, !expectedBlockTags)
RecordConsensusBackendUpdateDelay(be, updateDelay)
if !expectedBlockTags { if !expectedBlockTags {
log.Warn("backend banned - unexpected block tags", log.Warn("backend banned - unexpected block tags",
...@@ -317,18 +336,6 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { ...@@ -317,18 +336,6 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
) )
cp.Ban(be) cp.Ban(be)
} }
if changed {
log.Debug("backend state updated",
"name", be.Name,
"peerCount", peerCount,
"inSync", inSync,
"latestBlockNumber", latestBlockNumber,
"latestBlockHash", latestBlockHash,
"finalizedBlockNumber", finalizedBlockNumber,
"safeBlockNumber", safeBlockNumber,
"updateDelay", updateDelay)
}
} }
// checkExpectedBlockTags for unexpected conditions on block tags // checkExpectedBlockTags for unexpected conditions on block tags
...@@ -349,101 +356,41 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { ...@@ -349,101 +356,41 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
// get the latest block number from the tracker // get the latest block number from the tracker
currentConsensusBlockNumber := cp.GetLatestBlockNumber() currentConsensusBlockNumber := cp.GetLatestBlockNumber()
// find out what backends are the candidates to be in the consensus group // get the candidates for the consensus group
// and create a copy of current their state candidates := cp.getConsensusCandidates()
//
// a serving node needs to be:
// - not banned
// - healthy (network latency and error rate)
// - with minimum peer count
// - in sync
// - updated recently
// - not lagging latest block
candidates := make(map[*Backend]*backendState, len(cp.backendGroup.Backends))
filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends))
for _, be := range cp.backendGroup.Backends {
bs := cp.getBackendState(be)
passed := true
if time.Now().Before(bs.bannedUntil) {
passed = false
}
if !be.IsHealthy() {
passed = false
}
if !be.skipPeerCountCheck && bs.peerCount < cp.minPeerCount {
passed = false
}
if !bs.inSync {
passed = false
}
if bs.lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) {
passed = false
}
if passed {
candidates[be] = bs
} else {
filteredBackendsNames = append(filteredBackendsNames, be.Name)
}
}
// find the highest block, in order to use it defining the highest non-lagging ancestor block
var highestLatestBlock hexutil.Uint64
for _, bs := range candidates {
if bs.latestBlockNumber > highestLatestBlock {
highestLatestBlock = bs.latestBlockNumber
}
}
// find the highest common ancestor block // update the lowest latest block number and hash
// the lowest safe block number
// the lowest finalized block number
var lowestLatestBlock hexutil.Uint64 var lowestLatestBlock hexutil.Uint64
var lowestLatestBlockHash string var lowestLatestBlockHash string
var lowestFinalizedBlock hexutil.Uint64 var lowestFinalizedBlock hexutil.Uint64
var lowestSafeBlock hexutil.Uint64 var lowestSafeBlock hexutil.Uint64
lagging := make([]*Backend, 0, len(candidates)) for _, bs := range candidates {
for be, bs := range candidates {
// check if backend is lagging behind the highest block
if bs.latestBlockNumber < highestLatestBlock && uint64(highestLatestBlock-bs.latestBlockNumber) > cp.maxBlockLag {
lagging = append(lagging, be)
continue
}
// update the lowest common ancestor block
if lowestLatestBlock == 0 || bs.latestBlockNumber < lowestLatestBlock { if lowestLatestBlock == 0 || bs.latestBlockNumber < lowestLatestBlock {
lowestLatestBlock = bs.latestBlockNumber lowestLatestBlock = bs.latestBlockNumber
lowestLatestBlockHash = bs.latestBlockHash lowestLatestBlockHash = bs.latestBlockHash
} }
// update the lowest finalized block
if lowestFinalizedBlock == 0 || bs.finalizedBlockNumber < lowestFinalizedBlock { if lowestFinalizedBlock == 0 || bs.finalizedBlockNumber < lowestFinalizedBlock {
lowestFinalizedBlock = bs.finalizedBlockNumber lowestFinalizedBlock = bs.finalizedBlockNumber
} }
// update the lowest safe block
if lowestSafeBlock == 0 || bs.safeBlockNumber < lowestSafeBlock { if lowestSafeBlock == 0 || bs.safeBlockNumber < lowestSafeBlock {
lowestSafeBlock = bs.safeBlockNumber lowestSafeBlock = bs.safeBlockNumber
} }
} }
// remove lagging backends from the candidates
for _, be := range lagging {
filteredBackendsNames = append(filteredBackendsNames, be.Name)
delete(candidates, be)
}
// find the proposed block among the candidates // find the proposed block among the candidates
// the proposed block needs have the same hash in the entire consensus group
proposedBlock := lowestLatestBlock proposedBlock := lowestLatestBlock
proposedBlockHash := lowestLatestBlockHash proposedBlockHash := lowestLatestBlockHash
hasConsensus := false hasConsensus := false
broken := false
if lowestLatestBlock > currentConsensusBlockNumber { if lowestLatestBlock > currentConsensusBlockNumber {
log.Debug("validating consensus on block", "lowestLatestBlock", lowestLatestBlock) log.Debug("validating consensus on block", "lowestLatestBlock", lowestLatestBlock)
} }
// if there is no block to propose, the consensus is automatically broken // if there is a block to propose, check if it is the same in all backends
// this can happen when backends have just recovered
broken := proposedBlock == 0 && currentConsensusBlockNumber > 0
if proposedBlock > 0 { if proposedBlock > 0 {
for !hasConsensus { for !hasConsensus {
allAgreed := true allAgreed := true
...@@ -459,7 +406,12 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { ...@@ -459,7 +406,12 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
blocksDontMatch := (actualBlockNumber != proposedBlock) || (actualBlockHash != proposedBlockHash) blocksDontMatch := (actualBlockNumber != proposedBlock) || (actualBlockHash != proposedBlockHash)
if blocksDontMatch { if blocksDontMatch {
if currentConsensusBlockNumber >= actualBlockNumber { if currentConsensusBlockNumber >= actualBlockNumber {
log.Warn("backend broke consensus", "name", be.Name, "blockNum", actualBlockNumber, "proposedBlockNum", proposedBlock, "blockHash", actualBlockHash, "proposedBlockHash", proposedBlockHash) log.Warn("backend broke consensus",
"name", be.Name,
"actualBlockNumber", actualBlockNumber,
"actualBlockHash", actualBlockHash,
"proposedBlock", proposedBlock,
"proposedBlockHash", proposedBlockHash)
broken = true broken = true
} }
allAgreed = false allAgreed = false
...@@ -488,6 +440,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { ...@@ -488,6 +440,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
"proposedBlockHash", proposedBlockHash) "proposedBlockHash", proposedBlockHash)
} }
// update tracker
cp.tracker.SetLatestBlockNumber(proposedBlock) cp.tracker.SetLatestBlockNumber(proposedBlock)
cp.tracker.SetSafeBlockNumber(lowestSafeBlock) cp.tracker.SetSafeBlockNumber(lowestSafeBlock)
cp.tracker.SetFinalizedBlockNumber(lowestFinalizedBlock) cp.tracker.SetFinalizedBlockNumber(lowestFinalizedBlock)
...@@ -495,10 +448,17 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { ...@@ -495,10 +448,17 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
// update consensus group // update consensus group
group := make([]*Backend, 0, len(candidates)) group := make([]*Backend, 0, len(candidates))
consensusBackendsNames := make([]string, 0, len(candidates)) consensusBackendsNames := make([]string, 0, len(candidates))
for be, _ := range candidates { filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends))
group = append(group, be) for _, be := range cp.backendGroup.Backends {
consensusBackendsNames = append(consensusBackendsNames, be.Name) _, exist := candidates[be]
if exist {
group = append(group, be)
consensusBackendsNames = append(consensusBackendsNames, be.Name)
} else {
filteredBackendsNames = append(filteredBackendsNames, be.Name)
}
} }
cp.consensusGroupMux.Lock() cp.consensusGroupMux.Lock()
cp.consensusGroup = group cp.consensusGroup = group
cp.consensusGroupMux.Unlock() cp.consensusGroupMux.Unlock()
...@@ -522,7 +482,7 @@ func (cp *ConsensusPoller) IsBanned(be *Backend) bool { ...@@ -522,7 +482,7 @@ func (cp *ConsensusPoller) IsBanned(be *Backend) bool {
bs := cp.backendState[be] bs := cp.backendState[be]
defer bs.backendStateMux.Unlock() defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock() bs.backendStateMux.Lock()
return time.Now().Before(bs.bannedUntil) return bs.IsBanned()
} }
// Ban bans a specific backend // Ban bans a specific backend
...@@ -531,6 +491,9 @@ func (cp *ConsensusPoller) Ban(be *Backend) { ...@@ -531,6 +491,9 @@ func (cp *ConsensusPoller) Ban(be *Backend) {
defer bs.backendStateMux.Unlock() defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock() bs.backendStateMux.Lock()
bs.bannedUntil = time.Now().Add(cp.banPeriod) bs.bannedUntil = time.Now().Add(cp.banPeriod)
// when we ban a node, we give it the chance to start from any block when it is back
bs.latestBlockNumber = 0
bs.safeBlockNumber = 0 bs.safeBlockNumber = 0
bs.finalizedBlockNumber = 0 bs.finalizedBlockNumber = 0
} }
...@@ -613,12 +576,12 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo ...@@ -613,12 +576,12 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo
return res, nil return res, nil
} }
// getBackendState creates a copy of backend state so that the caller can use it without locking
func (cp *ConsensusPoller) getBackendState(be *Backend) *backendState { func (cp *ConsensusPoller) getBackendState(be *Backend) *backendState {
bs := cp.backendState[be] bs := cp.backendState[be]
defer bs.backendStateMux.Unlock() defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock() bs.backendStateMux.Lock()
// we return a copy so that the caller can use it without locking
return &backendState{ return &backendState{
latestBlockNumber: bs.latestBlockNumber, latestBlockNumber: bs.latestBlockNumber,
latestBlockHash: bs.latestBlockHash, latestBlockHash: bs.latestBlockHash,
...@@ -634,18 +597,76 @@ func (cp *ConsensusPoller) getBackendState(be *Backend) *backendState { ...@@ -634,18 +597,76 @@ func (cp *ConsensusPoller) getBackendState(be *Backend) *backendState {
func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool, func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool,
latestBlockNumber hexutil.Uint64, latestBlockHash string, latestBlockNumber hexutil.Uint64, latestBlockHash string,
finalizedBlockNumber hexutil.Uint64, finalizedBlockNumber hexutil.Uint64,
safeBlockNumber hexutil.Uint64) (changed bool, updateDelay time.Duration) { safeBlockNumber hexutil.Uint64) bool {
bs := cp.backendState[be] bs := cp.backendState[be]
bs.backendStateMux.Lock() bs.backendStateMux.Lock()
changed = bs.latestBlockHash != latestBlockHash changed := bs.latestBlockHash != latestBlockHash
bs.peerCount = peerCount bs.peerCount = peerCount
bs.inSync = inSync bs.inSync = inSync
bs.latestBlockNumber = latestBlockNumber bs.latestBlockNumber = latestBlockNumber
bs.latestBlockHash = latestBlockHash bs.latestBlockHash = latestBlockHash
bs.finalizedBlockNumber = finalizedBlockNumber bs.finalizedBlockNumber = finalizedBlockNumber
bs.safeBlockNumber = safeBlockNumber bs.safeBlockNumber = safeBlockNumber
updateDelay = time.Since(bs.lastUpdate)
bs.lastUpdate = time.Now() bs.lastUpdate = time.Now()
bs.backendStateMux.Unlock() bs.backendStateMux.Unlock()
return return changed
}
// getConsensusCandidates find out what backends are the candidates to be in the consensus group
// and create a copy of current their state
//
// a candidate is a serving node within the following conditions:
// - not banned
// - healthy (network latency and error rate)
// - with minimum peer count
// - in sync
// - updated recently
// - not lagging latest block
func (cp *ConsensusPoller) getConsensusCandidates() map[*Backend]*backendState {
candidates := make(map[*Backend]*backendState, len(cp.backendGroup.Backends))
for _, be := range cp.backendGroup.Backends {
bs := cp.getBackendState(be)
if bs.IsBanned() {
continue
}
if !be.IsHealthy() {
continue
}
if !be.skipPeerCountCheck && bs.peerCount < cp.minPeerCount {
continue
}
if !bs.inSync {
continue
}
if bs.lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) {
continue
}
candidates[be] = bs
}
// find the highest block, in order to use it defining the highest non-lagging ancestor block
var highestLatestBlock hexutil.Uint64
for _, bs := range candidates {
if bs.latestBlockNumber > highestLatestBlock {
highestLatestBlock = bs.latestBlockNumber
}
}
// find the highest common ancestor block
lagging := make([]*Backend, 0, len(candidates))
for be, bs := range candidates {
// check if backend is lagging behind the highest block
if bs.latestBlockNumber < highestLatestBlock && uint64(highestLatestBlock-bs.latestBlockNumber) > cp.maxBlockLag {
lagging = append(lagging, be)
}
}
// remove lagging backends from the candidates
for _, be := range lagging {
delete(candidates, be)
}
return candidates
} }
...@@ -428,7 +428,7 @@ func TestConsensus(t *testing.T) { ...@@ -428,7 +428,7 @@ func TestConsensus(t *testing.T) {
require.Equal(t, "0xb1", bg.Consensus.GetSafeBlockNumber().String()) require.Equal(t, "0xb1", bg.Consensus.GetSafeBlockNumber().String())
}) })
t.Run("latest dropped below safe, and stayed inconsistent after ban", func(t *testing.T) { t.Run("latest dropped below safe, and stayed inconsistent", func(t *testing.T) {
reset() reset()
useOnlyNode1() useOnlyNode1()
overrideBlock("node1", "latest", "0xd1") overrideBlock("node1", "latest", "0xd1")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment