Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
N
nebula
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
exchain
nebula
Commits
1270a43c
Commit
1270a43c
authored
May 27, 2023
by
Felipe Andrade
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
refactor poller
parent
dad7aeee
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
109 additions
and
103 deletions
+109
-103
consensus_poller.go
proxyd/consensus_poller.go
+109
-103
No files found.
proxyd/consensus_poller.go
View file @
1270a43c
...
...
@@ -34,8 +34,7 @@ type ConsensusPoller struct {
tracker
ConsensusTracker
asyncHandler
ConsensusAsyncHandler
minPeerCount
uint64
minPeerCount
uint64
banPeriod
time
.
Duration
maxUpdateThreshold
time
.
Duration
maxBlockLag
uint64
...
...
@@ -220,7 +219,7 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller
banPeriod
:
5
*
time
.
Minute
,
maxUpdateThreshold
:
30
*
time
.
Second
,
maxBlockLag
:
8
,
//
quarter of an epoch,
8*12 seconds = 96 seconds ~ 1.6 minutes
maxBlockLag
:
8
,
// 8*12 seconds = 96 seconds ~ 1.6 minutes
minPeerCount
:
3
,
}
...
...
@@ -253,12 +252,11 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
// if backend is not healthy state we'll only resume checking it after ban
if
!
be
.
IsHealthy
()
{
log
.
Warn
(
"backend banned - not
online or not
healthy"
,
"backend"
,
be
.
Name
)
log
.
Warn
(
"backend banned - not healthy"
,
"backend"
,
be
.
Name
)
cp
.
Ban
(
be
)
return
}
// if backend it not in sync we'll check again after ban
inSync
,
err
:=
cp
.
isInSync
(
ctx
,
be
)
RecordConsensusBackendInSync
(
be
,
err
==
nil
&&
inSync
)
if
err
!=
nil
{
...
...
@@ -276,21 +274,27 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
latestBlockNumber
,
latestBlockHash
,
err
:=
cp
.
fetchBlock
(
ctx
,
be
,
"latest"
)
if
err
!=
nil
{
log
.
Warn
(
"error updating backend"
,
"name"
,
be
.
Name
,
"err"
,
err
)
log
.
Warn
(
"error updating backend
- latest block
"
,
"name"
,
be
.
Name
,
"err"
,
err
)
}
safeBlockNumber
,
_
,
err
:=
cp
.
fetchBlock
(
ctx
,
be
,
"safe"
)
if
err
!=
nil
{
log
.
Warn
(
"error updating backend"
,
"name"
,
be
.
Name
,
"err"
,
err
)
log
.
Warn
(
"error updating backend
- safe block
"
,
"name"
,
be
.
Name
,
"err"
,
err
)
}
finalizedBlockNumber
,
_
,
err
:=
cp
.
fetchBlock
(
ctx
,
be
,
"finalized"
)
if
err
!=
nil
{
log
.
Warn
(
"error updating backend"
,
"name"
,
be
.
Name
,
"err"
,
err
)
log
.
Warn
(
"error updating backend
- finalized block
"
,
"name"
,
be
.
Name
,
"err"
,
err
)
}
_
,
_
,
_
,
_
,
oldFinalized
,
oldSafe
,
_
,
_
:=
cp
.
getBackendState
(
be
)
expectedBlockTags
:=
cp
.
checkExpectedBlockTags
(
finalizedBlockNumber
,
oldFinalized
,
safeBlockNumber
,
oldSafe
,
latestBlockNumber
)
bs
:=
cp
.
getBackendState
(
be
)
oldFinalized
:=
bs
.
finalizedBlockNumber
oldSafe
:=
bs
.
safeBlockNumber
expectedBlockTags
:=
cp
.
checkExpectedBlockTags
(
finalizedBlockNumber
,
oldFinalized
,
safeBlockNumber
,
oldSafe
,
latestBlockNumber
)
changed
,
updateDelay
:=
cp
.
setBackendState
(
be
,
peerCount
,
inSync
,
latestBlockNumber
,
latestBlockHash
,
...
...
@@ -342,116 +346,108 @@ func (cp *ConsensusPoller) checkExpectedBlockTags(currentFinalized hexutil.Uint6
// UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends
func
(
cp
*
ConsensusPoller
)
UpdateBackendGroupConsensus
(
ctx
context
.
Context
)
{
var
highestLatestBlock
hexutil
.
Uint64
var
lowestLatestBlock
hexutil
.
Uint64
var
lowestLatestBlockHash
string
var
lowestFinalizedBlock
hexutil
.
Uint64
var
lowestSafeBlock
hexutil
.
Uint64
// get the latest block number from the tracker
currentConsensusBlockNumber
:=
cp
.
GetLatestBlockNumber
()
// find the highest block, in order to use it defining the highest non-lagging ancestor block
// find out what backends are the candidates to be in the consensus group
// and create a copy of current their state
//
// a serving node needs to be:
// - not banned
// - healthy (network latency and error rate)
// - with minimum peer count
// - in sync
// - updated recently
// - not lagging latest block
candidates
:=
make
(
map
[
*
Backend
]
*
backendState
,
len
(
cp
.
backendGroup
.
Backends
))
filteredBackendsNames
:=
make
([]
string
,
0
,
len
(
cp
.
backendGroup
.
Backends
))
for
_
,
be
:=
range
cp
.
backendGroup
.
Backends
{
peerCount
,
inSync
,
backendLatestBlockNumber
,
_
,
_
,
_
,
lastUpdate
,
_
:=
cp
.
getBackendState
(
be
)
if
cp
.
IsBanned
(
be
)
{
continu
e
bs
:=
cp
.
getBackendState
(
be
)
passed
:=
true
if
time
.
Now
()
.
Before
(
bs
.
bannedUntil
)
{
passed
=
fals
e
}
if
!
be
.
skipPeerCountCheck
&&
peerCount
<
cp
.
minPeerCount
{
continu
e
if
!
be
.
IsHealthy
()
{
passed
=
fals
e
}
if
!
inSync
{
continu
e
if
!
be
.
skipPeerCountCheck
&&
bs
.
peerCount
<
cp
.
minPeerCount
{
passed
=
fals
e
}
if
lastUpdate
.
Add
(
cp
.
maxUpdateThreshold
)
.
Before
(
time
.
Now
())
{
continu
e
if
!
bs
.
inSync
{
passed
=
fals
e
}
if
backendLatestBlockNumber
>
highestLatestBlock
{
highestLatestBlock
=
backendLatestBlockNumber
if
bs
.
lastUpdate
.
Add
(
cp
.
maxUpdateThreshold
)
.
Before
(
time
.
Now
())
{
passed
=
false
}
if
passed
{
candidates
[
be
]
=
bs
}
else
{
filteredBackendsNames
=
append
(
filteredBackendsNames
,
be
.
Name
)
}
}
// find the highest common ancestor block
for
_
,
be
:=
range
cp
.
backendGroup
.
Backends
{
peerCount
,
inSync
,
backendLatestBlockNumber
,
backendLatestBlockHash
,
backendFinalizedBlockNumber
,
backendSafeBlockNumber
,
lastUpdate
,
_
:=
cp
.
getBackendState
(
be
)
if
cp
.
IsBanned
(
be
)
{
continue
}
if
!
be
.
skipPeerCountCheck
&&
peerCount
<
cp
.
minPeerCount
{
continue
}
if
!
inSync
{
continue
}
if
lastUpdate
.
Add
(
cp
.
maxUpdateThreshold
)
.
Before
(
time
.
Now
())
{
continue
// find the highest block, in order to use it defining the highest non-lagging ancestor block
var
highestLatestBlock
hexutil
.
Uint64
for
_
,
bs
:=
range
candidates
{
if
bs
.
latestBlockNumber
>
highestLatestBlock
{
highestLatestBlock
=
bs
.
latestBlockNumber
}
}
// find the highest common ancestor block
var
lowestLatestBlock
hexutil
.
Uint64
var
lowestLatestBlockHash
string
var
lowestFinalizedBlock
hexutil
.
Uint64
var
lowestSafeBlock
hexutil
.
Uint64
lagging
:=
make
([]
*
Backend
,
0
,
len
(
candidates
))
for
be
,
bs
:=
range
candidates
{
// check if backend is lagging behind the highest block
if
backendLatestBlockNumber
<
highestLatestBlock
&&
uint64
(
highestLatestBlock
-
backendLatestBlockNumber
)
>
cp
.
maxBlockLag
{
if
bs
.
latestBlockNumber
<
highestLatestBlock
&&
uint64
(
highestLatestBlock
-
bs
.
latestBlockNumber
)
>
cp
.
maxBlockLag
{
lagging
=
append
(
lagging
,
be
)
continue
}
if
lowestLatestBlock
==
0
||
backendLatestBlockNumber
<
lowestLatestBlock
{
lowestLatestBlock
=
backendLatestBlockNumber
lowestLatestBlockHash
=
backendLatestBlockHash
// update the lowest common ancestor block
if
lowestLatestBlock
==
0
||
bs
.
latestBlockNumber
<
lowestLatestBlock
{
lowestLatestBlock
=
bs
.
latestBlockNumber
lowestLatestBlockHash
=
bs
.
latestBlockHash
}
if
lowestFinalizedBlock
==
0
||
backendFinalizedBlockNumber
<
lowestFinalizedBlock
{
lowestFinalizedBlock
=
backendFinalizedBlockNumber
// update the lowest finalized block
if
lowestFinalizedBlock
==
0
||
bs
.
finalizedBlockNumber
<
lowestFinalizedBlock
{
lowestFinalizedBlock
=
bs
.
finalizedBlockNumber
}
if
lowestSafeBlock
==
0
||
backendSafeBlockNumber
<
lowestSafeBlock
{
lowestSafeBlock
=
backendSafeBlockNumber
// update the lowest safe block
if
lowestSafeBlock
==
0
||
bs
.
safeBlockNumber
<
lowestSafeBlock
{
lowestSafeBlock
=
bs
.
safeBlockNumber
}
}
// remove lagging backends from the candidates
for
_
,
be
:=
range
lagging
{
filteredBackendsNames
=
append
(
filteredBackendsNames
,
be
.
Name
)
delete
(
candidates
,
be
)
}
// find the proposed block among the candidates
proposedBlock
:=
lowestLatestBlock
proposedBlockHash
:=
lowestLatestBlockHash
hasConsensus
:=
false
// check if everybody agrees on the same block hash
consensusBackends
:=
make
([]
*
Backend
,
0
,
len
(
cp
.
backendGroup
.
Backends
))
consensusBackendsNames
:=
make
([]
string
,
0
,
len
(
cp
.
backendGroup
.
Backends
))
filteredBackendsNames
:=
make
([]
string
,
0
,
len
(
cp
.
backendGroup
.
Backends
))
if
lowestLatestBlock
>
currentConsensusBlockNumber
{
log
.
Debug
(
"validating consensus on block"
,
"lowestLatestBlock"
,
lowestLatestBlock
)
}
// if there is no block to propose, the consensus is automatically broken
// this can happen when backends have just recovered
broken
:=
proposedBlock
==
0
&&
currentConsensusBlockNumber
>
0
if
proposedBlock
>
0
{
for
!
hasConsensus
{
allAgreed
:=
true
consensusBackends
=
consensusBackends
[
:
0
]
filteredBackendsNames
=
filteredBackendsNames
[
:
0
]
for
_
,
be
:=
range
cp
.
backendGroup
.
Backends
{
/*
a serving node needs to be:
- healthy (network)
- updated recently
- not banned
- with minimum peer count
- not lagging latest block
- in sync
*/
peerCount
,
inSync
,
latestBlockNumber
,
_
,
_
,
_
,
lastUpdate
,
bannedUntil
:=
cp
.
getBackendState
(
be
)
notUpdated
:=
lastUpdate
.
Add
(
cp
.
maxUpdateThreshold
)
.
Before
(
time
.
Now
())
isBanned
:=
time
.
Now
()
.
Before
(
bannedUntil
)
notEnoughPeers
:=
!
be
.
skipPeerCountCheck
&&
peerCount
<
cp
.
minPeerCount
lagging
:=
latestBlockNumber
<
proposedBlock
if
!
be
.
IsHealthy
()
||
notUpdated
||
isBanned
||
notEnoughPeers
||
lagging
||
!
inSync
{
filteredBackendsNames
=
append
(
filteredBackendsNames
,
be
.
Name
)
continue
}
for
be
,
_
:=
range
candidates
{
actualBlockNumber
,
actualBlockHash
,
err
:=
cp
.
fetchBlock
(
ctx
,
be
,
proposedBlock
.
String
())
if
err
!=
nil
{
log
.
Warn
(
"error updating backend"
,
"name"
,
be
.
Name
,
"err"
,
err
)
...
...
@@ -469,8 +465,6 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
allAgreed
=
false
break
}
consensusBackends
=
append
(
consensusBackends
,
be
)
consensusBackendsNames
=
append
(
consensusBackendsNames
,
be
.
Name
)
}
if
allAgreed
{
hasConsensus
=
true
...
...
@@ -488,26 +482,39 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
for
_
,
l
:=
range
cp
.
listeners
{
l
()
}
log
.
Info
(
"consensus broken"
,
"currentConsensusBlockNumber"
,
currentConsensusBlockNumber
,
"proposedBlock"
,
proposedBlock
,
"proposedBlockHash"
,
proposedBlockHash
)
log
.
Info
(
"consensus broken"
,
"currentConsensusBlockNumber"
,
currentConsensusBlockNumber
,
"proposedBlock"
,
proposedBlock
,
"proposedBlockHash"
,
proposedBlockHash
)
}
cp
.
tracker
.
SetLatestBlockNumber
(
proposedBlock
)
cp
.
tracker
.
SetSafeBlockNumber
(
lowestSafeBlock
)
cp
.
tracker
.
SetFinalizedBlockNumber
(
lowestFinalizedBlock
)
// update consensus group
group
:=
make
([]
*
Backend
,
0
,
len
(
candidates
))
consensusBackendsNames
:=
make
([]
string
,
0
,
len
(
candidates
))
for
be
,
_
:=
range
candidates
{
group
=
append
(
group
,
be
)
consensusBackendsNames
=
append
(
consensusBackendsNames
,
be
.
Name
)
}
cp
.
consensusGroupMux
.
Lock
()
cp
.
consensusGroup
=
consensusBackends
cp
.
consensusGroup
=
group
cp
.
consensusGroupMux
.
Unlock
()
RecordGroupConsensusLatestBlock
(
cp
.
backendGroup
,
proposedBlock
)
RecordGroupConsensusSafeBlock
(
cp
.
backendGroup
,
lowestSafeBlock
)
RecordGroupConsensusFinalizedBlock
(
cp
.
backendGroup
,
lowestFinalizedBlock
)
RecordGroupConsensusCount
(
cp
.
backendGroup
,
len
(
consensusBackends
))
RecordGroupConsensusCount
(
cp
.
backendGroup
,
len
(
group
))
RecordGroupConsensusFilteredCount
(
cp
.
backendGroup
,
len
(
filteredBackendsNames
))
RecordGroupTotalCount
(
cp
.
backendGroup
,
len
(
cp
.
backendGroup
.
Backends
))
log
.
Debug
(
"group state"
,
"proposedBlock"
,
proposedBlock
,
"consensusBackends"
,
strings
.
Join
(
consensusBackendsNames
,
", "
),
"filteredBackends"
,
strings
.
Join
(
filteredBackendsNames
,
", "
))
log
.
Debug
(
"group state"
,
"proposedBlock"
,
proposedBlock
,
"consensusBackends"
,
strings
.
Join
(
consensusBackendsNames
,
", "
),
"filteredBackends"
,
strings
.
Join
(
filteredBackendsNames
,
", "
))
}
// IsBanned checks if a specific backend is banned
...
...
@@ -606,23 +613,22 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo
return
res
,
nil
}
func
(
cp
*
ConsensusPoller
)
getBackendState
(
be
*
Backend
)
(
peerCount
uint64
,
inSync
bool
,
latestBlockNumber
hexutil
.
Uint64
,
latestBlockHash
string
,
finalizedBlockNumber
hexutil
.
Uint64
,
safeBlockNumber
hexutil
.
Uint64
,
lastUpdate
time
.
Time
,
bannedUntil
time
.
Time
)
{
func
(
cp
*
ConsensusPoller
)
getBackendState
(
be
*
Backend
)
*
backendState
{
bs
:=
cp
.
backendState
[
be
]
defer
bs
.
backendStateMux
.
Unlock
()
bs
.
backendStateMux
.
Lock
()
peerCount
=
bs
.
peerCount
inSync
=
bs
.
inSync
latestBlockNumber
=
bs
.
latestBlockNumber
latestBlockHash
=
bs
.
latestBlockHash
finalizedBlockNumber
=
bs
.
finalizedBlockNumber
safeBlockNumber
=
bs
.
safeBlockNumber
lastUpdate
=
bs
.
lastUpdate
bannedUntil
=
bs
.
bannedUntil
return
// we return a copy so that the caller can use it without locking
return
&
backendState
{
latestBlockNumber
:
bs
.
latestBlockNumber
,
latestBlockHash
:
bs
.
latestBlockHash
,
safeBlockNumber
:
bs
.
safeBlockNumber
,
finalizedBlockNumber
:
bs
.
finalizedBlockNumber
,
peerCount
:
bs
.
peerCount
,
inSync
:
bs
.
inSync
,
lastUpdate
:
bs
.
lastUpdate
,
bannedUntil
:
bs
.
bannedUntil
,
}
}
func
(
cp
*
ConsensusPoller
)
setBackendState
(
be
*
Backend
,
peerCount
uint64
,
inSync
bool
,
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment