Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
N
nebula
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
exchain
nebula
Commits
ef976f1e
Unverified
Commit
ef976f1e
authored
May 29, 2023
by
mergify[bot]
Committed by
GitHub
May 29, 2023
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'develop' into feat/mainnet-deploy
parents
da2d5c47
693e8f43
Changes
5
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
58 additions
and
58 deletions
+58
-58
backend.go
proxyd/backend.go
+1
-1
consensus_poller.go
proxyd/consensus_poller.go
+25
-31
consensus_tracker.go
proxyd/consensus_tracker.go
+19
-18
consensus_test.go
proxyd/integration_tests/consensus_test.go
+7
-7
metrics.go
proxyd/metrics.go
+6
-1
No files found.
proxyd/backend.go
View file @
ef976f1e
...
...
@@ -564,8 +564,8 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
// We also rewrite block tags to enforce compliance with consensus
rctx
:=
RewriteContext
{
latest
:
bg
.
Consensus
.
GetLatestBlockNumber
(),
finalized
:
bg
.
Consensus
.
GetFinalizedBlockNumber
(),
safe
:
bg
.
Consensus
.
GetSafeBlockNumber
(),
finalized
:
bg
.
Consensus
.
GetFinalizedBlockNumber
(),
}
for
i
,
req
:=
range
rpcReqs
{
...
...
proxyd/consensus_poller.go
View file @
ef976f1e
...
...
@@ -43,11 +43,10 @@ type ConsensusPoller struct {
type
backendState
struct
{
backendStateMux
sync
.
Mutex
latestBlockNumber
hexutil
.
Uint64
latestBlockHash
string
finalizedBlockNumber
hexutil
.
Uint64
latestBlockNumber
hexutil
.
Uint64
latestBlockHash
string
safeBlockNumber
hexutil
.
Uint64
finalizedBlockNumber
hexutil
.
Uint64
peerCount
uint64
inSync
bool
...
...
@@ -77,16 +76,16 @@ func (ct *ConsensusPoller) GetLatestBlockNumber() hexutil.Uint64 {
return
ct
.
tracker
.
GetLatestBlockNumber
()
}
// GetFinalizedBlockNumber returns the `finalized` agreed block number in a consensus
func
(
ct
*
ConsensusPoller
)
GetFinalizedBlockNumber
()
hexutil
.
Uint64
{
return
ct
.
tracker
.
GetFinalizedBlockNumber
()
}
// GetSafeBlockNumber returns the `safe` agreed block number in a consensus
func
(
ct
*
ConsensusPoller
)
GetSafeBlockNumber
()
hexutil
.
Uint64
{
return
ct
.
tracker
.
GetSafeBlockNumber
()
}
// GetFinalizedBlockNumber returns the `finalized` agreed block number in a consensus
func
(
ct
*
ConsensusPoller
)
GetFinalizedBlockNumber
()
hexutil
.
Uint64
{
return
ct
.
tracker
.
GetFinalizedBlockNumber
()
}
func
(
cp
*
ConsensusPoller
)
Shutdown
()
{
cp
.
asyncHandler
.
Shutdown
()
}
...
...
@@ -212,9 +211,6 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller
ctx
,
cancelFunc
:=
context
.
WithCancel
(
context
.
Background
())
state
:=
make
(
map
[
*
Backend
]
*
backendState
,
len
(
bg
.
Backends
))
for
_
,
be
:=
range
bg
.
Backends
{
state
[
be
]
=
&
backendState
{}
}
cp
:=
&
ConsensusPoller
{
cancelFunc
:
cancelFunc
,
...
...
@@ -239,6 +235,7 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller
cp
.
asyncHandler
=
NewPollerAsyncHandler
(
ctx
,
cp
)
}
cp
.
Reset
()
cp
.
asyncHandler
.
Init
()
return
cp
...
...
@@ -291,15 +288,11 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
log
.
Warn
(
"error updating backend - finalized block"
,
"name"
,
be
.
Name
,
"err"
,
err
)
}
oldFinalized
:=
bs
.
finalizedBlockNumber
oldSafe
:=
bs
.
safeBlockNumber
updateDelay
:=
time
.
Since
(
bs
.
lastUpdate
)
RecordConsensusBackendUpdateDelay
(
be
,
updateDelay
)
RecordConsensusBackendUpdateDelay
(
be
,
bs
.
lastUpdate
)
changed
:=
cp
.
setBackendState
(
be
,
peerCount
,
inSync
,
latestBlockNumber
,
latestBlockHash
,
finalizedBlockNumber
,
safe
BlockNumber
)
safeBlockNumber
,
finalized
BlockNumber
)
RecordBackendLatestBlock
(
be
,
latestBlockNumber
)
RecordBackendSafeBlock
(
be
,
safeBlockNumber
)
...
...
@@ -312,25 +305,25 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
"inSync"
,
inSync
,
"latestBlockNumber"
,
latestBlockNumber
,
"latestBlockHash"
,
latestBlockHash
,
"finalizedBlockNumber"
,
finalizedBlockNumber
,
"safeBlockNumber"
,
safeBlockNumber
,
"updateDelay"
,
updateDelay
)
"finalizedBlockNumber"
,
finalizedBlockNumber
,
"lastUpdate"
,
bs
.
lastUpdate
)
}
// sanity check for latest, safe and finalized block tags
expectedBlockTags
:=
cp
.
checkExpectedBlockTags
(
finalizedBlockNumber
,
oldFinalized
,
safeBlockNumber
,
oldSafe
,
latest
BlockNumber
)
latestBlockNumber
,
bs
.
safeBlockNumber
,
safeBlockNumber
,
bs
.
finalizedBlockNumber
,
finalized
BlockNumber
)
RecordBackendUnexpectedBlockTags
(
be
,
!
expectedBlockTags
)
if
!
expectedBlockTags
{
log
.
Warn
(
"backend banned - unexpected block tags"
,
"backend"
,
be
.
Name
,
"oldFinalized"
,
oldFinalized
,
"oldFinalized"
,
bs
.
finalizedBlockNumber
,
"finalizedBlockNumber"
,
finalizedBlockNumber
,
"oldSafe"
,
oldSafe
,
"oldSafe"
,
bs
.
safeBlockNumber
,
"safeBlockNumber"
,
safeBlockNumber
,
"latestBlockNumber"
,
latestBlockNumber
,
)
...
...
@@ -342,9 +335,10 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
// - finalized block number should never decrease
// - safe block number should never decrease
// - finalized block should be <= safe block <= latest block
func
(
cp
*
ConsensusPoller
)
checkExpectedBlockTags
(
currentFinalized
hexutil
.
Uint64
,
oldFinalized
hexutil
.
Uint64
,
currentSafe
hexutil
.
Uint64
,
oldSafe
hexutil
.
Uint64
,
currentLatest
hexutil
.
Uint64
)
bool
{
func
(
cp
*
ConsensusPoller
)
checkExpectedBlockTags
(
currentLatest
hexutil
.
Uint64
,
oldSafe
hexutil
.
Uint64
,
currentSafe
hexutil
.
Uint64
,
oldFinalized
hexutil
.
Uint64
,
currentFinalized
hexutil
.
Uint64
)
bool
{
return
currentFinalized
>=
oldFinalized
&&
currentSafe
>=
oldSafe
&&
currentFinalized
<=
currentSafe
&&
...
...
@@ -596,8 +590,8 @@ func (cp *ConsensusPoller) getBackendState(be *Backend) *backendState {
func
(
cp
*
ConsensusPoller
)
setBackendState
(
be
*
Backend
,
peerCount
uint64
,
inSync
bool
,
latestBlockNumber
hexutil
.
Uint64
,
latestBlockHash
string
,
finalized
BlockNumber
hexutil
.
Uint64
,
safe
BlockNumber
hexutil
.
Uint64
)
bool
{
safe
BlockNumber
hexutil
.
Uint64
,
finalized
BlockNumber
hexutil
.
Uint64
)
bool
{
bs
:=
cp
.
backendState
[
be
]
bs
.
backendStateMux
.
Lock
()
changed
:=
bs
.
latestBlockHash
!=
latestBlockHash
...
...
@@ -658,7 +652,7 @@ func (cp *ConsensusPoller) getConsensusCandidates() map[*Backend]*backendState {
lagging
:=
make
([]
*
Backend
,
0
,
len
(
candidates
))
for
be
,
bs
:=
range
candidates
{
// check if backend is lagging behind the highest block
if
bs
.
latestBlockNumber
<
highestLatestBlock
&&
uint64
(
highestLatestBlock
-
bs
.
latestBlockNumber
)
>
cp
.
maxBlockLag
{
if
uint64
(
highestLatestBlock
-
bs
.
latestBlockNumber
)
>
cp
.
maxBlockLag
{
lagging
=
append
(
lagging
,
be
)
}
}
...
...
proxyd/consensus_tracker.go
View file @
ef976f1e
...
...
@@ -15,17 +15,17 @@ import (
type
ConsensusTracker
interface
{
GetLatestBlockNumber
()
hexutil
.
Uint64
SetLatestBlockNumber
(
blockNumber
hexutil
.
Uint64
)
GetFinalizedBlockNumber
()
hexutil
.
Uint64
SetFinalizedBlockNumber
(
blockNumber
hexutil
.
Uint64
)
GetSafeBlockNumber
()
hexutil
.
Uint64
SetSafeBlockNumber
(
blockNumber
hexutil
.
Uint64
)
GetFinalizedBlockNumber
()
hexutil
.
Uint64
SetFinalizedBlockNumber
(
blockNumber
hexutil
.
Uint64
)
}
// InMemoryConsensusTracker store and retrieve in memory, async-safe
type
InMemoryConsensusTracker
struct
{
latestBlockNumber
hexutil
.
Uint64
finalizedBlockNumber
hexutil
.
Uint64
safeBlockNumber
hexutil
.
Uint64
finalizedBlockNumber
hexutil
.
Uint64
mutex
sync
.
Mutex
}
...
...
@@ -49,32 +49,32 @@ func (ct *InMemoryConsensusTracker) SetLatestBlockNumber(blockNumber hexutil.Uin
ct
.
latestBlockNumber
=
blockNumber
}
func
(
ct
*
InMemoryConsensusTracker
)
Get
Finalized
BlockNumber
()
hexutil
.
Uint64
{
func
(
ct
*
InMemoryConsensusTracker
)
Get
Safe
BlockNumber
()
hexutil
.
Uint64
{
defer
ct
.
mutex
.
Unlock
()
ct
.
mutex
.
Lock
()
return
ct
.
finalized
BlockNumber
return
ct
.
safe
BlockNumber
}
func
(
ct
*
InMemoryConsensusTracker
)
Set
Finalized
BlockNumber
(
blockNumber
hexutil
.
Uint64
)
{
func
(
ct
*
InMemoryConsensusTracker
)
Set
Safe
BlockNumber
(
blockNumber
hexutil
.
Uint64
)
{
defer
ct
.
mutex
.
Unlock
()
ct
.
mutex
.
Lock
()
ct
.
finalized
BlockNumber
=
blockNumber
ct
.
safe
BlockNumber
=
blockNumber
}
func
(
ct
*
InMemoryConsensusTracker
)
Get
Safe
BlockNumber
()
hexutil
.
Uint64
{
func
(
ct
*
InMemoryConsensusTracker
)
Get
Finalized
BlockNumber
()
hexutil
.
Uint64
{
defer
ct
.
mutex
.
Unlock
()
ct
.
mutex
.
Lock
()
return
ct
.
safe
BlockNumber
return
ct
.
finalized
BlockNumber
}
func
(
ct
*
InMemoryConsensusTracker
)
Set
Safe
BlockNumber
(
blockNumber
hexutil
.
Uint64
)
{
func
(
ct
*
InMemoryConsensusTracker
)
Set
Finalized
BlockNumber
(
blockNumber
hexutil
.
Uint64
)
{
defer
ct
.
mutex
.
Unlock
()
ct
.
mutex
.
Lock
()
ct
.
safe
BlockNumber
=
blockNumber
ct
.
finalized
BlockNumber
=
blockNumber
}
// RedisConsensusTracker uses a Redis `client` to store and retrieve consensus, async-safe
...
...
@@ -104,13 +104,6 @@ func (ct *RedisConsensusTracker) SetLatestBlockNumber(blockNumber hexutil.Uint64
ct
.
client
.
Set
(
ct
.
ctx
,
ct
.
key
(
"latest"
),
blockNumber
,
0
)
}
func
(
ct
*
RedisConsensusTracker
)
GetFinalizedBlockNumber
()
hexutil
.
Uint64
{
return
hexutil
.
Uint64
(
hexutil
.
MustDecodeUint64
(
ct
.
client
.
Get
(
ct
.
ctx
,
ct
.
key
(
"finalized"
))
.
Val
()))
}
func
(
ct
*
RedisConsensusTracker
)
SetFinalizedBlockNumber
(
blockNumber
hexutil
.
Uint64
)
{
ct
.
client
.
Set
(
ct
.
ctx
,
ct
.
key
(
"finalized"
),
blockNumber
,
0
)
}
func
(
ct
*
RedisConsensusTracker
)
GetSafeBlockNumber
()
hexutil
.
Uint64
{
return
hexutil
.
Uint64
(
hexutil
.
MustDecodeUint64
(
ct
.
client
.
Get
(
ct
.
ctx
,
ct
.
key
(
"safe"
))
.
Val
()))
}
...
...
@@ -118,3 +111,11 @@ func (ct *RedisConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 {
func
(
ct
*
RedisConsensusTracker
)
SetSafeBlockNumber
(
blockNumber
hexutil
.
Uint64
)
{
ct
.
client
.
Set
(
ct
.
ctx
,
ct
.
key
(
"safe"
),
blockNumber
,
0
)
}
func
(
ct
*
RedisConsensusTracker
)
GetFinalizedBlockNumber
()
hexutil
.
Uint64
{
return
hexutil
.
Uint64
(
hexutil
.
MustDecodeUint64
(
ct
.
client
.
Get
(
ct
.
ctx
,
ct
.
key
(
"finalized"
))
.
Val
()))
}
func
(
ct
*
RedisConsensusTracker
)
SetFinalizedBlockNumber
(
blockNumber
hexutil
.
Uint64
)
{
ct
.
client
.
Set
(
ct
.
ctx
,
ct
.
key
(
"finalized"
),
blockNumber
,
0
)
}
proxyd/integration_tests/consensus_test.go
View file @
ef976f1e
...
...
@@ -267,8 +267,8 @@ func TestConsensus(t *testing.T) {
overrideBlock
(
"node2"
,
"safe"
,
"0xe2"
)
update
()
require
.
Equal
(
t
,
"0xc1"
,
bg
.
Consensus
.
GetFinalizedBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0xe1"
,
bg
.
Consensus
.
GetSafeBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0xc1"
,
bg
.
Consensus
.
GetFinalizedBlockNumber
()
.
String
())
})
t
.
Run
(
"advance safe and finalized"
,
func
(
t
*
testing
.
T
)
{
...
...
@@ -279,8 +279,8 @@ func TestConsensus(t *testing.T) {
overrideBlock
(
"node2"
,
"safe"
,
"0xe2"
)
update
()
require
.
Equal
(
t
,
"0xc2"
,
bg
.
Consensus
.
GetFinalizedBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0xe2"
,
bg
.
Consensus
.
GetSafeBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0xc2"
,
bg
.
Consensus
.
GetFinalizedBlockNumber
()
.
String
())
})
t
.
Run
(
"ban backend if error rate is too high"
,
func
(
t
*
testing
.
T
)
{
...
...
@@ -317,8 +317,8 @@ func TestConsensus(t *testing.T) {
overrideBlock
(
"node1"
,
"safe"
,
"0xa1"
)
update
()
require
.
Equal
(
t
,
"0xc1"
,
bg
.
Consensus
.
GetFinalizedBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0xe1"
,
bg
.
Consensus
.
GetSafeBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0xc1"
,
bg
.
Consensus
.
GetFinalizedBlockNumber
()
.
String
())
consensusGroup
:=
bg
.
Consensus
.
GetConsensusGroup
()
require
.
NotContains
(
t
,
consensusGroup
,
nodes
[
"node1"
]
.
backend
)
...
...
@@ -349,8 +349,8 @@ func TestConsensus(t *testing.T) {
update
()
require
.
Equal
(
t
,
"0x101"
,
bg
.
Consensus
.
GetLatestBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0xc1"
,
bg
.
Consensus
.
GetFinalizedBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0xe1"
,
bg
.
Consensus
.
GetSafeBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0xc1"
,
bg
.
Consensus
.
GetFinalizedBlockNumber
()
.
String
())
consensusGroup
:=
bg
.
Consensus
.
GetConsensusGroup
()
require
.
NotContains
(
t
,
consensusGroup
,
nodes
[
"node1"
]
.
backend
)
...
...
@@ -365,8 +365,8 @@ func TestConsensus(t *testing.T) {
update
()
require
.
Equal
(
t
,
"0x101"
,
bg
.
Consensus
.
GetLatestBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0xc1"
,
bg
.
Consensus
.
GetFinalizedBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0xe1"
,
bg
.
Consensus
.
GetSafeBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0xc1"
,
bg
.
Consensus
.
GetFinalizedBlockNumber
()
.
String
())
consensusGroup
:=
bg
.
Consensus
.
GetConsensusGroup
()
require
.
NotContains
(
t
,
consensusGroup
,
nodes
[
"node1"
]
.
backend
)
...
...
@@ -397,8 +397,8 @@ func TestConsensus(t *testing.T) {
require
.
Equal
(
t
,
1
,
len
(
consensusGroup
))
require
.
Equal
(
t
,
"0xd1"
,
bg
.
Consensus
.
GetLatestBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0x91"
,
bg
.
Consensus
.
GetFinalizedBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0xb1"
,
bg
.
Consensus
.
GetSafeBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0x91"
,
bg
.
Consensus
.
GetFinalizedBlockNumber
()
.
String
())
})
t
.
Run
(
"latest dropped below safe, then recovered"
,
func
(
t
*
testing
.
T
)
{
...
...
@@ -424,8 +424,8 @@ func TestConsensus(t *testing.T) {
require
.
Equal
(
t
,
1
,
len
(
consensusGroup
))
require
.
Equal
(
t
,
"0xd1"
,
bg
.
Consensus
.
GetLatestBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0x91"
,
bg
.
Consensus
.
GetFinalizedBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0xb1"
,
bg
.
Consensus
.
GetSafeBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0x91"
,
bg
.
Consensus
.
GetFinalizedBlockNumber
()
.
String
())
})
t
.
Run
(
"latest dropped below safe, and stayed inconsistent"
,
func
(
t
*
testing
.
T
)
{
...
...
proxyd/metrics.go
View file @
ef976f1e
...
...
@@ -482,7 +482,12 @@ func RecordConsensusBackendInSync(b *Backend, inSync bool) {
consensusInSyncBackend
.
WithLabelValues
(
b
.
Name
)
.
Set
(
boolToFloat64
(
inSync
))
}
func
RecordConsensusBackendUpdateDelay
(
b
*
Backend
,
delay
time
.
Duration
)
{
func
RecordConsensusBackendUpdateDelay
(
b
*
Backend
,
lastUpdate
time
.
Time
)
{
// avoid recording the delay for the first update
if
lastUpdate
.
IsZero
()
{
return
}
delay
:=
time
.
Since
(
lastUpdate
)
consensusUpdateDelayBackend
.
WithLabelValues
(
b
.
Name
)
.
Set
(
float64
(
delay
.
Milliseconds
()))
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment