Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
N
nebula
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
exchain
nebula
Commits
3b4bcd69
Commit
3b4bcd69
authored
May 27, 2023
by
Felipe Andrade
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
order params chronologically: latest, safe, finalized
parent
210b3eac
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
48 additions
and
51 deletions
+48
-51
backend.go
proxyd/backend.go
+1
-1
consensus_poller.go
proxyd/consensus_poller.go
+21
-25
consensus_tracker.go
proxyd/consensus_tracker.go
+19
-18
consensus_test.go
proxyd/integration_tests/consensus_test.go
+7
-7
No files found.
proxyd/backend.go
View file @
3b4bcd69
...
...
@@ -564,8 +564,8 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
// We also rewrite block tags to enforce compliance with consensus
rctx
:=
RewriteContext
{
latest
:
bg
.
Consensus
.
GetLatestBlockNumber
(),
finalized
:
bg
.
Consensus
.
GetFinalizedBlockNumber
(),
safe
:
bg
.
Consensus
.
GetSafeBlockNumber
(),
finalized
:
bg
.
Consensus
.
GetFinalizedBlockNumber
(),
}
for
i
,
req
:=
range
rpcReqs
{
...
...
proxyd/consensus_poller.go
View file @
3b4bcd69
...
...
@@ -43,11 +43,10 @@ type ConsensusPoller struct {
type
backendState
struct
{
backendStateMux
sync
.
Mutex
latestBlockNumber
hexutil
.
Uint64
latestBlockHash
string
finalizedBlockNumber
hexutil
.
Uint64
latestBlockNumber
hexutil
.
Uint64
latestBlockHash
string
safeBlockNumber
hexutil
.
Uint64
finalizedBlockNumber
hexutil
.
Uint64
peerCount
uint64
inSync
bool
...
...
@@ -77,16 +76,16 @@ func (ct *ConsensusPoller) GetLatestBlockNumber() hexutil.Uint64 {
return
ct
.
tracker
.
GetLatestBlockNumber
()
}
// GetFinalizedBlockNumber returns the `finalized` agreed block number in a consensus
func
(
ct
*
ConsensusPoller
)
GetFinalizedBlockNumber
()
hexutil
.
Uint64
{
return
ct
.
tracker
.
GetFinalizedBlockNumber
()
}
// GetSafeBlockNumber returns the `safe` agreed block number in a consensus
func
(
ct
*
ConsensusPoller
)
GetSafeBlockNumber
()
hexutil
.
Uint64
{
return
ct
.
tracker
.
GetSafeBlockNumber
()
}
// GetFinalizedBlockNumber returns the `finalized` agreed block number in a consensus
func
(
ct
*
ConsensusPoller
)
GetFinalizedBlockNumber
()
hexutil
.
Uint64
{
return
ct
.
tracker
.
GetFinalizedBlockNumber
()
}
func
(
cp
*
ConsensusPoller
)
Shutdown
()
{
cp
.
asyncHandler
.
Shutdown
()
}
...
...
@@ -289,15 +288,11 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
log
.
Warn
(
"error updating backend - finalized block"
,
"name"
,
be
.
Name
,
"err"
,
err
)
}
// just for readability
oldFinalized
:=
bs
.
finalizedBlockNumber
oldSafe
:=
bs
.
safeBlockNumber
RecordConsensusBackendUpdateDelay
(
be
,
bs
.
lastUpdate
)
changed
:=
cp
.
setBackendState
(
be
,
peerCount
,
inSync
,
latestBlockNumber
,
latestBlockHash
,
finalizedBlockNumber
,
safe
BlockNumber
)
safeBlockNumber
,
finalized
BlockNumber
)
RecordBackendLatestBlock
(
be
,
latestBlockNumber
)
RecordBackendSafeBlock
(
be
,
safeBlockNumber
)
...
...
@@ -310,25 +305,25 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
"inSync"
,
inSync
,
"latestBlockNumber"
,
latestBlockNumber
,
"latestBlockHash"
,
latestBlockHash
,
"finalizedBlockNumber"
,
finalizedBlockNumber
,
"safeBlockNumber"
,
safeBlockNumber
,
"finalizedBlockNumber"
,
finalizedBlockNumber
,
"lastUpdate"
,
bs
.
lastUpdate
)
}
// sanity check for latest, safe and finalized block tags
expectedBlockTags
:=
cp
.
checkExpectedBlockTags
(
finalizedBlockNumber
,
oldFinalized
,
safeBlockNumber
,
oldSafe
,
latest
BlockNumber
)
latestBlockNumber
,
bs
.
safeBlockNumber
,
safeBlockNumber
,
bs
.
finalizedBlockNumber
,
finalized
BlockNumber
)
RecordBackendUnexpectedBlockTags
(
be
,
!
expectedBlockTags
)
if
!
expectedBlockTags
{
log
.
Warn
(
"backend banned - unexpected block tags"
,
"backend"
,
be
.
Name
,
"oldFinalized"
,
oldFinalized
,
"oldFinalized"
,
bs
.
finalizedBlockNumber
,
"finalizedBlockNumber"
,
finalizedBlockNumber
,
"oldSafe"
,
oldSafe
,
"oldSafe"
,
bs
.
safeBlockNumber
,
"safeBlockNumber"
,
safeBlockNumber
,
"latestBlockNumber"
,
latestBlockNumber
,
)
...
...
@@ -340,9 +335,10 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
// - finalized block number should never decrease
// - safe block number should never decrease
// - finalized block should be <= safe block <= latest block
func
(
cp
*
ConsensusPoller
)
checkExpectedBlockTags
(
currentFinalized
hexutil
.
Uint64
,
oldFinalized
hexutil
.
Uint64
,
currentSafe
hexutil
.
Uint64
,
oldSafe
hexutil
.
Uint64
,
currentLatest
hexutil
.
Uint64
)
bool
{
func
(
cp
*
ConsensusPoller
)
checkExpectedBlockTags
(
currentLatest
hexutil
.
Uint64
,
oldSafe
hexutil
.
Uint64
,
currentSafe
hexutil
.
Uint64
,
oldFinalized
hexutil
.
Uint64
,
currentFinalized
hexutil
.
Uint64
)
bool
{
return
currentFinalized
>=
oldFinalized
&&
currentSafe
>=
oldSafe
&&
currentFinalized
<=
currentSafe
&&
...
...
@@ -594,8 +590,8 @@ func (cp *ConsensusPoller) getBackendState(be *Backend) *backendState {
func
(
cp
*
ConsensusPoller
)
setBackendState
(
be
*
Backend
,
peerCount
uint64
,
inSync
bool
,
latestBlockNumber
hexutil
.
Uint64
,
latestBlockHash
string
,
finalized
BlockNumber
hexutil
.
Uint64
,
safe
BlockNumber
hexutil
.
Uint64
)
bool
{
safe
BlockNumber
hexutil
.
Uint64
,
finalized
BlockNumber
hexutil
.
Uint64
)
bool
{
bs
:=
cp
.
backendState
[
be
]
bs
.
backendStateMux
.
Lock
()
changed
:=
bs
.
latestBlockHash
!=
latestBlockHash
...
...
proxyd/consensus_tracker.go
View file @
3b4bcd69
...
...
@@ -15,17 +15,17 @@ import (
type
ConsensusTracker
interface
{
GetLatestBlockNumber
()
hexutil
.
Uint64
SetLatestBlockNumber
(
blockNumber
hexutil
.
Uint64
)
GetFinalizedBlockNumber
()
hexutil
.
Uint64
SetFinalizedBlockNumber
(
blockNumber
hexutil
.
Uint64
)
GetSafeBlockNumber
()
hexutil
.
Uint64
SetSafeBlockNumber
(
blockNumber
hexutil
.
Uint64
)
GetFinalizedBlockNumber
()
hexutil
.
Uint64
SetFinalizedBlockNumber
(
blockNumber
hexutil
.
Uint64
)
}
// InMemoryConsensusTracker store and retrieve in memory, async-safe
type
InMemoryConsensusTracker
struct
{
latestBlockNumber
hexutil
.
Uint64
finalizedBlockNumber
hexutil
.
Uint64
safeBlockNumber
hexutil
.
Uint64
finalizedBlockNumber
hexutil
.
Uint64
mutex
sync
.
Mutex
}
...
...
@@ -49,32 +49,32 @@ func (ct *InMemoryConsensusTracker) SetLatestBlockNumber(blockNumber hexutil.Uin
ct
.
latestBlockNumber
=
blockNumber
}
func
(
ct
*
InMemoryConsensusTracker
)
Get
Finalized
BlockNumber
()
hexutil
.
Uint64
{
func
(
ct
*
InMemoryConsensusTracker
)
Get
Safe
BlockNumber
()
hexutil
.
Uint64
{
defer
ct
.
mutex
.
Unlock
()
ct
.
mutex
.
Lock
()
return
ct
.
finalized
BlockNumber
return
ct
.
safe
BlockNumber
}
func
(
ct
*
InMemoryConsensusTracker
)
Set
Finalized
BlockNumber
(
blockNumber
hexutil
.
Uint64
)
{
func
(
ct
*
InMemoryConsensusTracker
)
Set
Safe
BlockNumber
(
blockNumber
hexutil
.
Uint64
)
{
defer
ct
.
mutex
.
Unlock
()
ct
.
mutex
.
Lock
()
ct
.
finalized
BlockNumber
=
blockNumber
ct
.
safe
BlockNumber
=
blockNumber
}
func
(
ct
*
InMemoryConsensusTracker
)
Get
Safe
BlockNumber
()
hexutil
.
Uint64
{
func
(
ct
*
InMemoryConsensusTracker
)
Get
Finalized
BlockNumber
()
hexutil
.
Uint64
{
defer
ct
.
mutex
.
Unlock
()
ct
.
mutex
.
Lock
()
return
ct
.
safe
BlockNumber
return
ct
.
finalized
BlockNumber
}
func
(
ct
*
InMemoryConsensusTracker
)
Set
Safe
BlockNumber
(
blockNumber
hexutil
.
Uint64
)
{
func
(
ct
*
InMemoryConsensusTracker
)
Set
Finalized
BlockNumber
(
blockNumber
hexutil
.
Uint64
)
{
defer
ct
.
mutex
.
Unlock
()
ct
.
mutex
.
Lock
()
ct
.
safe
BlockNumber
=
blockNumber
ct
.
finalized
BlockNumber
=
blockNumber
}
// RedisConsensusTracker uses a Redis `client` to store and retrieve consensus, async-safe
...
...
@@ -104,13 +104,6 @@ func (ct *RedisConsensusTracker) SetLatestBlockNumber(blockNumber hexutil.Uint64
ct
.
client
.
Set
(
ct
.
ctx
,
ct
.
key
(
"latest"
),
blockNumber
,
0
)
}
func
(
ct
*
RedisConsensusTracker
)
GetFinalizedBlockNumber
()
hexutil
.
Uint64
{
return
hexutil
.
Uint64
(
hexutil
.
MustDecodeUint64
(
ct
.
client
.
Get
(
ct
.
ctx
,
ct
.
key
(
"finalized"
))
.
Val
()))
}
func
(
ct
*
RedisConsensusTracker
)
SetFinalizedBlockNumber
(
blockNumber
hexutil
.
Uint64
)
{
ct
.
client
.
Set
(
ct
.
ctx
,
ct
.
key
(
"finalized"
),
blockNumber
,
0
)
}
func
(
ct
*
RedisConsensusTracker
)
GetSafeBlockNumber
()
hexutil
.
Uint64
{
return
hexutil
.
Uint64
(
hexutil
.
MustDecodeUint64
(
ct
.
client
.
Get
(
ct
.
ctx
,
ct
.
key
(
"safe"
))
.
Val
()))
}
...
...
@@ -118,3 +111,11 @@ func (ct *RedisConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 {
func
(
ct
*
RedisConsensusTracker
)
SetSafeBlockNumber
(
blockNumber
hexutil
.
Uint64
)
{
ct
.
client
.
Set
(
ct
.
ctx
,
ct
.
key
(
"safe"
),
blockNumber
,
0
)
}
func
(
ct
*
RedisConsensusTracker
)
GetFinalizedBlockNumber
()
hexutil
.
Uint64
{
return
hexutil
.
Uint64
(
hexutil
.
MustDecodeUint64
(
ct
.
client
.
Get
(
ct
.
ctx
,
ct
.
key
(
"finalized"
))
.
Val
()))
}
func
(
ct
*
RedisConsensusTracker
)
SetFinalizedBlockNumber
(
blockNumber
hexutil
.
Uint64
)
{
ct
.
client
.
Set
(
ct
.
ctx
,
ct
.
key
(
"finalized"
),
blockNumber
,
0
)
}
proxyd/integration_tests/consensus_test.go
View file @
3b4bcd69
...
...
@@ -267,8 +267,8 @@ func TestConsensus(t *testing.T) {
overrideBlock
(
"node2"
,
"safe"
,
"0xe2"
)
update
()
require
.
Equal
(
t
,
"0xc1"
,
bg
.
Consensus
.
GetFinalizedBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0xe1"
,
bg
.
Consensus
.
GetSafeBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0xc1"
,
bg
.
Consensus
.
GetFinalizedBlockNumber
()
.
String
())
})
t
.
Run
(
"advance safe and finalized"
,
func
(
t
*
testing
.
T
)
{
...
...
@@ -279,8 +279,8 @@ func TestConsensus(t *testing.T) {
overrideBlock
(
"node2"
,
"safe"
,
"0xe2"
)
update
()
require
.
Equal
(
t
,
"0xc2"
,
bg
.
Consensus
.
GetFinalizedBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0xe2"
,
bg
.
Consensus
.
GetSafeBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0xc2"
,
bg
.
Consensus
.
GetFinalizedBlockNumber
()
.
String
())
})
t
.
Run
(
"ban backend if error rate is too high"
,
func
(
t
*
testing
.
T
)
{
...
...
@@ -317,8 +317,8 @@ func TestConsensus(t *testing.T) {
overrideBlock
(
"node1"
,
"safe"
,
"0xa1"
)
update
()
require
.
Equal
(
t
,
"0xc1"
,
bg
.
Consensus
.
GetFinalizedBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0xe1"
,
bg
.
Consensus
.
GetSafeBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0xc1"
,
bg
.
Consensus
.
GetFinalizedBlockNumber
()
.
String
())
consensusGroup
:=
bg
.
Consensus
.
GetConsensusGroup
()
require
.
NotContains
(
t
,
consensusGroup
,
nodes
[
"node1"
]
.
backend
)
...
...
@@ -349,8 +349,8 @@ func TestConsensus(t *testing.T) {
update
()
require
.
Equal
(
t
,
"0x101"
,
bg
.
Consensus
.
GetLatestBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0xc1"
,
bg
.
Consensus
.
GetFinalizedBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0xe1"
,
bg
.
Consensus
.
GetSafeBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0xc1"
,
bg
.
Consensus
.
GetFinalizedBlockNumber
()
.
String
())
consensusGroup
:=
bg
.
Consensus
.
GetConsensusGroup
()
require
.
NotContains
(
t
,
consensusGroup
,
nodes
[
"node1"
]
.
backend
)
...
...
@@ -365,8 +365,8 @@ func TestConsensus(t *testing.T) {
update
()
require
.
Equal
(
t
,
"0x101"
,
bg
.
Consensus
.
GetLatestBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0xc1"
,
bg
.
Consensus
.
GetFinalizedBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0xe1"
,
bg
.
Consensus
.
GetSafeBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0xc1"
,
bg
.
Consensus
.
GetFinalizedBlockNumber
()
.
String
())
consensusGroup
:=
bg
.
Consensus
.
GetConsensusGroup
()
require
.
NotContains
(
t
,
consensusGroup
,
nodes
[
"node1"
]
.
backend
)
...
...
@@ -397,8 +397,8 @@ func TestConsensus(t *testing.T) {
require
.
Equal
(
t
,
1
,
len
(
consensusGroup
))
require
.
Equal
(
t
,
"0xd1"
,
bg
.
Consensus
.
GetLatestBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0x91"
,
bg
.
Consensus
.
GetFinalizedBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0xb1"
,
bg
.
Consensus
.
GetSafeBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0x91"
,
bg
.
Consensus
.
GetFinalizedBlockNumber
()
.
String
())
})
t
.
Run
(
"latest dropped below safe, then recovered"
,
func
(
t
*
testing
.
T
)
{
...
...
@@ -424,8 +424,8 @@ func TestConsensus(t *testing.T) {
require
.
Equal
(
t
,
1
,
len
(
consensusGroup
))
require
.
Equal
(
t
,
"0xd1"
,
bg
.
Consensus
.
GetLatestBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0x91"
,
bg
.
Consensus
.
GetFinalizedBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0xb1"
,
bg
.
Consensus
.
GetSafeBlockNumber
()
.
String
())
require
.
Equal
(
t
,
"0x91"
,
bg
.
Consensus
.
GetFinalizedBlockNumber
()
.
String
())
})
t
.
Run
(
"latest dropped below safe, and stayed inconsistent"
,
func
(
t
*
testing
.
T
)
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment